Skip to main content
Glama

MCP Server Starter

All files code.txt133 kB
================= All files code: This text file consist of codes from all files from this project - easy to put to ur generative AI. ================= dbs.yaml (root folder): ``` #---------------------------------------------------------------------------- # Cloud Database (With no direct values - Values from ENV VARS) #---------------------------------------------------------------------------- databases: # Railway MySQL - alias: ${MYSQL_ALIAS:mysql} dialect: mysql host: ${MYSQL_HOST} port: ${MYSQL_PORT:12498} user: ${MYSQL_USER} password: ${MYSQL_PASSWORD} database: ${MYSQL_DB} # Railway PostgreSQL - alias: ${PG_ALIAS:pg} dialect: pg host: ${PG_HOST} port: ${PG_PORT:55537} user: ${PG_USER} password: ${PG_PASSWORD} database: ${PG_DB} # Azure SQL Server - alias: ${MSSQL_ALIAS:mssql} dialect: mssql host: ${MSSQL_HOST} port: ${MSSQL_PORT:1433} user: ${MSSQL_USER} password: ${MSSQL_PASSWORD} database: ${MSSQL_DB} # Cloud Oracle - Docker to the Azure Container - alias: ${ORACLE_ALIAS:oracle} dialect: oracle connectString: ${ORACLE_CONNECT_STRING} user: ${ORACLE_USER} password: ${ORACLE_PASSWORD} ``` .env (root folder): ``` #---------------------------------------------------------------------------- # DIFFERENT DATABASE CONFIGURATION EXAMPLES #---------------------------------------------------------------------------- # 1) SQLite # DB_PROVIDER=sqlite # SQLITE_PATH=./sample.db # 2) PostgreSQL # DB_PROVIDER=postgres # DATABASE_URL=postgres://postgres:postgres@127.0.0.1:5432/appdb #postgres://USER:PASSWORD@HOST:5432/DBNAME # 3) MySQL / MariaDB # DB_PROVIDER=mysql # DATABASE_URL=mysql://root:rootpass@127.0.0.1:3306/appdb #mysql://USER:PASSWORD@HOST:3306/DBNAME # 4) SQL Server / Azure SQL # DB_PROVIDER=mssql # DATABASE_URL=Server=localhost,1433;Database=appdb;User Id=sa;Password=Passw0rd!;Encrypt=true;TrustServerCertificate=true # Example (keep commented): # Server=HOST,1433;Database=DB;User Id=USER;Password=PASS;Encrypt=true # 5) Oracle # DB_PROVIDER=oracle # DATABASE_URL=USER/PASSWORD@HOST:1521/DBNAME PROJECT_ENDPOINT= AZURE_AI_KEY= MODEL_DEPLOYMENT_NAME= MCP_SERVER_URL= ``` package.json: ``` { "name": "mcp-server-starter", "version": "1.0.0", "type": "module", "scripts": { "seed": "tsx src/db/seed.ts", "dev:server": "tsx src/server/stdio.ts", "dev:client": "tsx src/client/devClient.ts", "build": "tsc", "start": "node dist/server/http.js", "postinstall": "npm run build", "db:ping": "tsx dev/ping.ts", "db:up": "docker-compose up -d", "db:down": "docker-compose down", "dev:http": "tsx src/server/http.ts" }, "engines": { "node": ">=20 <21" }, "dependencies": { "@modelcontextprotocol/sdk": "^1.17.5", "better-sqlite3": "^12.2.0", "dotenv": "^17.2.1", "js-yaml": "^4.1.0", "mssql": "^11.0.1", "mysql2": "^3.14.4", "oracledb": "^6.9.0", "pg": "^8.16.3", "sqlite3": "^5.1.7", "zod": "^3.25.76" }, "devDependencies": { "@types/better-sqlite3": "^7.6.13", "@types/express": "^5.0.3", "@types/js-yaml": "^4.0.9", "@types/mssql": "^9.1.7", "@types/node": "^24.3.1", "@types/oracledb": "^6.9.1", "@types/pg": "^8.15.5", "and": "^0.0.3", "express": "^5.1.0", "i": "^0.3.7", "npm": "^11.6.0", "tsx": "^4.20.5", "typescript": "^5.9.2" } } ``` client/devClient.ts ``` import "dotenv/config"; import { Client } from "@modelcontextprotocol/sdk/client/index.js"; import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"; import { ListToolsResultSchema, CallToolResultSchema, } from "@modelcontextprotocol/sdk/types.js"; async function main() { // Spawn the server via tsx for dev convenience const transport = new StdioClientTransport({ command: process.platform === "win32" ? "npx.cmd" : "npx", args: ["tsx", "src/server/stdio.ts"], }); const client = new Client({ name: "dev-client", version: "0.1.0" }); await client.connect(transport); // === tools/list === const toolsResp = await client.request( { method: "tools/list", params: {} }, ListToolsResultSchema ); const toolNames = toolsResp.tools.map(t => t.name); console.log("Available tools:", toolNames); // --- NEW: auto-detect alias (optionally honoring DEV_DB_ALIAS) const preferred = process.env.DEV_DB_ALIAS?.trim(); const alias = pickAlias(toolNames, preferred); if (!alias) { throw new Error( "No namespaced SQL tools found (expected '<alias>.sql.schema'). " + "Check your dbs.yaml and environment." ); } console.log("Using DB alias:", alias, preferred ? `(preferred=${preferred})` : ""); // === tools/call: <alias>.sql.schema === const schemaRes = await client.request( { method: "tools/call", params: { name: `${alias}.sql.schema`, arguments: {}, }, }, CallToolResultSchema ); console.log( "\n=== sql.schema ===\n", schemaRes.content?.[0]?.type === "text" ? schemaRes.content[0].text : schemaRes ); // === tools/call: <alias>.sql.peek === const peekRes = await client.request( { method: "tools/call", params: { name: `${alias}.sql.peek`, arguments: { maxRowsPerTable: 50, // adjust if needed as: "json", }, }, }, CallToolResultSchema ); console.log( "\n=== sql.peek ===\n", peekRes.content?.[0]?.type === "text" ? peekRes.content[0].text : JSON.stringify(peekRes, null, 2) ); // === tools/call: <alias>.sql.query === const sample = detectSampleQuery(); const queryRes = await client.request( { method: "tools/call", params: { name: `${alias}.sql.query`, arguments: { sql: sample.text, params: sample.params, readOnly: true, rowLimit: 10, as: "json", }, }, }, CallToolResultSchema ); console.log( "\n=== sql.query ===\n", queryRes.content?.[0]?.type === "text" ? queryRes.content[0].text : queryRes ); await client.close(); } // --- NEW: helper to pick a valid alias from tools/list (with optional preferred) function pickAlias(names: string[], preferred?: string | null): string | null { const aliases = Array.from(new Set(names.map(n => n.split(".")[0]))); const hasSchema = (a: string) => names.includes(`${a}.sql.schema`); if (preferred && aliases.includes(preferred) && hasSchema(preferred)) { return preferred; } const first = aliases.find(hasSchema) ?? null; if (!first) { console.warn("No alias exposes '.sql.schema'. Found aliases:", aliases); } return first; } function detectSampleQuery() { const provider = (process.env.DB_PROVIDER ?? "sqlite").toLowerCase(); if (provider.includes("oracle")) return { text: "SELECT 1 AS one FROM dual", params: {} }; return { text: "SELECT 1 AS one", params: {} }; } main().catch((err) => { console.error("[dev-client] error:", err); process.exit(1); }); ``` client/agent.py: ``` # agent_mcp_cmd.py """ CMD-based demo: Azure AI Foundry Agent <-> MCP Server (HTTP) - Prompts for username/password and verifies against your Cloud DB (MySQL/Postgres). - On successful login, binds X-Role and X-User-Id to MCP HTTP session. - Creates an Azure Agent with function tools that bridge to MCP tools. - Lets the user chat; agent calls MCP tools to query your DBs (per RBAC/policies). - Typing 'q' or 'quit' deletes the agent and closes the MCP session cleanly. Requirements: pip install python-dotenv requests azure-identity azure-ai-agents # plus one of: pip install mysql-connector-python # if using MySQL for login pip install psycopg2-binary # if using Postgres for login """ import os import sys import json import time import getpass from typing import Any, Dict, Optional, Tuple, Callable, Set import requests from dotenv import load_dotenv from azure.identity import DefaultAzureCredential from azure.ai.agents import AgentsClient from azure.ai.agents.models import ( FunctionTool, RequiredFunctionToolCall, ToolOutput, SubmitToolOutputsAction, ListSortOrder, ) # ---------- Load env ---------- load_dotenv() PROJECT_ENDPOINT = os.environ["PROJECT_ENDPOINT"] MODEL_DEPLOYMENT_NAME = os.environ["MODEL_DEPLOYMENT_NAME"] MCP_SERVER_URL = os.environ["MCP_SERVER_URL"].rstrip("/") if not PROJECT_ENDPOINT or not MODEL_DEPLOYMENT_NAME or not MCP_SERVER_URL: print("❌ Missing env: PROJECT_ENDPOINT, MODEL_DEPLOYMENT_NAME, MCP_SERVER_URL") sys.exit(1) # Reduce noisy logs unless debugging os.environ.setdefault("AZURE_LOG_LEVEL", "warning") # ---------- DB Login (MySQL/Postgres) ---------- def db_login_loop() -> Tuple[str, str, str]: """ Repeatedly prompt the user for username/password and verify against your DB. Returns: (role, user_id, username) for the authenticated user. """ # Config for auth table/columns (override by env if needed) table = os.environ.get("AUTH_TABLE", "users") col_user = os.environ.get("AUTH_USER_COL", "username") col_pass = os.environ.get("AUTH_PASS_COL", "password") col_role = os.environ.get("AUTH_ROLE_COL", "role") col_userid = os.environ.get("AUTH_ID_COL", "user_id") # Dialect from env hints (prefer explicit) dialect = (os.environ.get("DB_PROVIDER") or os.environ.get("DB_DIALECT") or "").lower() if not dialect: # Derive from presence of connection envs dialect = "mysql" if os.environ.get("MYSQL_HOST") else ("pg" if os.environ.get("PG_HOST") else "") if dialect not in ("mysql", "pg"): print("❌ No DB_PROVIDER (mysql|pg) set and no MYSQL_HOST/PG_HOST present.") print(" Please configure your login DB connection envs.") sys.exit(1) print(f"[login] Using {dialect.upper()} for credential verification.") while True: username = input("Login username: ").strip() # Use getpass for password masking in CMD password = getpass.getpass("Login password: ").strip() try: if dialect == "mysql": import mysql.connector # mysql-connector-python conn = mysql.connector.connect( host=os.environ["MYSQL_HOST"], port=int(os.environ.get("MYSQL_PORT", "3306")), user=os.environ["MYSQL_USER"], password=os.environ["MYSQL_PASSWORD"], database=os.environ["MYSQL_DB"], ) sql = f""" SELECT {col_role}, {col_userid} FROM {table} WHERE {col_user} = %s AND {col_pass} = %s LIMIT 1 """ with conn.cursor() as cur: cur.execute(sql, (username, password)) row = cur.fetchone() conn.close() else: # Postgres import psycopg2 # psycopg2-binary conn = psycopg2.connect( host=os.environ["PG_HOST"], port=int(os.environ.get("PG_PORT", "5432")), user=os.environ["PG_USER"], password=os.environ["PG_PASSWORD"], dbname=os.environ["PG_DB"], ) sql = f""" SELECT {col_role}, {col_userid} FROM {table} WHERE {col_user} = %s AND {col_pass} = %s LIMIT 1 """ with conn.cursor() as cur: cur.execute(sql, (username, password)) row = cur.fetchone() conn.close() if not row: print("⚠️ Invalid credentials. Please try again.\n") continue role, user_id = str(row[0]), str(row[1]) print(f"[login] Authenticated. role={role} user_id={user_id}") return role, user_id, username except Exception as ex: print(f"❌ DB error: {ex}") print(" Please verify your DB env and try again.\n") time.sleep(0.8) # ---------- Minimal MCP HTTP client ---------- class McpHttpClient: """ Speaks JSON-RPC over your MCP HTTP endpoint (/mcp). - POST initialize → receives mcp-session-id in response headers. - Subsequent requests carry mcp-session-id + X-Role + X-User-Id. - Tools are invoked by 'tools/call' with {name, arguments}. Server-side behavior referenced from your http.ts and tools/sql code. # MCP server returns mcp-session-id in headers; expects X-Role/X-User-Id. [1](https://enfrasysconsulting-my.sharepoint.com/personal/muhammad_idzhans_enfrasys_com/Documents/Microsoft%20Copilot%20Chat%20Files/All%20files%20code.txt) """ def __init__(self, url: str): self.url = url.rstrip("/") self.sid: Optional[str] = None self.headers: Dict[str, str] = { "Content-Type": "application/json", # Streamable HTTP supports JSON or SSE responses; accept both: "Accept": "application/json, text/event-stream", } def set_identity(self, role: str, user_id: str): self.headers["x-role"] = role # RBAC / alias allowlist from policies.yaml [1](https://enfrasysconsulting-my.sharepoint.com/personal/muhammad_idzhans_enfrasys_com/Documents/Microsoft%20Copilot%20Chat%20Files/All%20files%20code.txt) self.headers["x-user-id"] = user_id # row filters use :user_id injected via userContext [1](https://enfrasysconsulting-my.sharepoint.com/personal/muhammad_idzhans_enfrasys_com/Documents/Microsoft%20Copilot%20Chat%20Files/All%20files%20code.txt) def _post(self, payload: Dict[str, Any]) -> requests.Response: return requests.post(self.url, headers=self.headers, data=json.dumps(payload), timeout=60) @staticmethod def _parse_mcp_response(text: str) -> Dict[str, Any]: """ Supports either plain JSON or SSE ('event: message\\ndata: {...}\\n\\n'). """ t = text.strip() if t.startswith("event:"): lines = t.splitlines() data_lines = [ln for ln in lines if ln.startswith("data:")] if not data_lines: raise ValueError(f"No 'data:' block in SSE: {t[:200]}...") payload = data_lines[-1][len("data: "):] return json.loads(payload) return json.loads(t) def initialize(self): payload = { "jsonrpc": "2.0", "id": "1", "method": "initialize", "params": { "protocolVersion": "2025-03-26", "clientInfo": {"name": "agents-bridge-client", "version": "1.0.0"}, "capabilities": {"roots": {"listChanged": True}, "sampling": {}, "tools": {}} } } r = self._post(payload) r.raise_for_status() sid = r.headers.get("mcp-session-id") if not sid: raise RuntimeError("MCP server did not return mcp-session-id header.") self.sid = sid def ready(self): assert self.sid, "Call initialize() first" self.headers["mcp-session-id"] = self.sid payload = {"jsonrpc": "2.0", "method": "notifications/initialized"} # server does not require body parsing here self._post(payload) def tools_call(self, name: str, arguments: Optional[Dict[str, Any]] = None) -> str: """ Execute a tool and return a single text result for easier display. Your server returns content=[{type:'text'|'json'}]; we stringify to text. Tool names include discovery (db.aliases/types/names/listByType) and namespaced SQL tools: "<alias>.sql.schema|peek|query". [1](https://enfrasysconsulting-my.sharepoint.com/personal/muhammad_idzhans_enfrasys_com/Documents/Microsoft%20Copilot%20Chat%20Files/All%20files%20code.txt) """ assert self.sid, "Call initialize() first" args = arguments if arguments is not None else {} payload = { "jsonrpc": "2.0", "id": "call-1", "method": "tools/call", "params": {"name": name, "arguments": args} } r = self._post(payload) r.raise_for_status() obj = self._parse_mcp_response(r.text) result = obj.get("result") or {} content = result.get("content") or [] if not content: return "[]" item = content[0] ctype = item.get("type") if ctype == "text": return item.get("text", "") if ctype == "json": try: return json.dumps(item.get("json"), ensure_ascii=False) except Exception: return str(item.get("json")) return json.dumps(obj, ensure_ascii=False) def close_session(self): """ Cleanly delete the server-side MCP session. """ if not self.sid: return try: requests.delete(self.url, headers=self.headers, timeout=30) except Exception: pass self.sid = None # ---------- Function tools (Agent -> MCP bridge) ---------- def build_function_tools(mcp: McpHttpClient) -> FunctionTool: """ Expose a small set of functions the Agent can call. These map to your MCP tools and keep the agent general-purpose. """ def db_aliases() -> str: return mcp.tools_call("db.aliases", {}) def db_types() -> str: return mcp.tools_call("db.types", {}) def db_names() -> str: return mcp.tools_call("db.names", {}) def db_list_by_type(type: str, unique: bool = True, includeAliases: bool = False) -> str: args = {"type": type, "unique": unique, "includeAliases": includeAliases} return mcp.tools_call("db.listByType", args) def sql_schema(alias: str) -> str: return mcp.tools_call(f"{alias}.sql.schema", {}) def sql_peek(alias: str, maxRowsPerTable: int = 50, as_: str = "markdown") -> str: args = {"maxRowsPerTable": maxRowsPerTable, "as": as_} return mcp.tools_call(f"{alias}.sql.peek", args) def sql_query(alias: str, sql: str, params: Optional[dict] = None, readOnly: bool = True, rowLimit: int = 1000, as_: str = "json") -> str: args = {"sql": sql, "params": params or {}, "readOnly": readOnly, "rowLimit": rowLimit, "as": as_} return mcp.tools_call(f"{alias}.sql.query", args) USER_FUNCTIONS: Set[Callable[..., Any]] = { db_aliases, db_types, db_names, db_list_by_type, sql_schema, sql_peek, sql_query } return FunctionTool(functions=USER_FUNCTIONS) # ---------- Azure Agent run helpers ---------- TERMINAL_STATES = {"completed", "failed", "expired", "cancelled"} def normalize_status(run) -> str: s = getattr(run, "status", None) if s is None: return "" for attr in ("value", "name"): if hasattr(s, attr): try: return str(getattr(s, attr)).lower() except Exception: pass return str(s).lower() def poll_until_terminal(client: AgentsClient, thread_id: str, run_id: str, interval: float = 1.0): last_status = None while True: run = client.runs.get(thread_id=thread_id, run_id=run_id) status = normalize_status(run) if status != last_status: print(f"[debug] run status -> {status}") last_status = status if status in TERMINAL_STATES: return run # Tool bridge if "requires_action" in status and isinstance(getattr(run, "required_action", None), SubmitToolOutputsAction): tool_calls = run.required_action.submit_tool_outputs.tool_calls outputs = [] for tc in tool_calls: print(f"[debug] tool_call: name={getattr(tc,'name','?')} args={getattr(tc,'arguments',{})}") if isinstance(tc, RequiredFunctionToolCall): try: # Execute locally defined FunctionTool out = FUNCTIONS.execute(tc) except Exception as ex: out = f"ERROR executing '{getattr(tc,'name','?')}': {ex}" outputs.append(ToolOutput(tool_call_id=tc.id, output=out)) if outputs: client.runs.submit_tool_outputs(thread_id=thread_id, run_id=run_id, tool_outputs=outputs) time.sleep(interval) # ---------- Main ---------- def main(): # 1) Login and bind identity to MCP role, user_id, username = db_login_loop() mcp = McpHttpClient(url=MCP_SERVER_URL) mcp.set_identity(role=role, user_id=user_id) # identity headers required by your server [1](https://enfrasysconsulting-my.sharepoint.com/personal/muhammad_idzhans_enfrasys_com/Documents/Microsoft%20Copilot%20Chat%20Files/All%20files%20code.txt) mcp.initialize() # POST initialize → mcp-session-id header returned by your server [1](https://enfrasysconsulting-my.sharepoint.com/personal/muhammad_idzhans_enfrasys_com/Documents/Microsoft%20Copilot%20Chat%20Files/All%20files%20code.txt) mcp.ready() global FUNCTIONS FUNCTIONS = build_function_tools(mcp) # 2) Azure Agents client agents_client = AgentsClient( endpoint=PROJECT_ENDPOINT, credential=DefaultAzureCredential( exclude_environment_credential=True, exclude_managed_identity_credential=True, ), ) # 3) Create agent with instructions tailored to your RBAC + row filters # Ensures "my" resolves to the logged-in user without follow-up questions. default_alias_hint = { "customer": "customer_db", "customer_admin": "customer_db", "merchant": "merchant_db", "merchant_admin": "merchant_db", }.get(role, None) # instructions = f""" # You are assisting a signed-in user. # - username: {username} # - role: {role} # - user_id: {user_id} # Behavior: # - Treat "my ..." as referring to user_id={user_id}. # - Do NOT ask who the user is; you already know. # - Use SQL tools via the provided functions (sql_schema / sql_peek / sql_query). # - Prefer named parameters (e.g., :user_id) and small result sets. # - If role is not 'admin', avoid discovery tools unless needed; rely on default alias. # - Default alias: {default_alias_hint or "(none)"} (use this unless the user explicitly chooses another allowed alias). # - Examples the user may ask: # "What is my current total amount of points?" # → Call sql_schema (once if needed), then sql_query on the default alias with a SELECT that aggregates from the relevant table(s), # using :user_id and LIMIT/TOP/ROWNUM as appropriate for the dialect. # Important: # - Your access is scoped by the server using X-Role and X-User-Id headers. # - If a query is rejected, adjust to allowed tables or apply row filters (e.g., WHERE user_id = :user_id). # """.strip() instructions = f""" You are assisting a signed-in user. - username: {username} - role: {role} - user_id: {user_id} Identity & pronouns - Treat any phrase like “my points”, “my purchases”, “my account” as referring to user_id={user_id}. - Do NOT ask the user who they are; you already know from the session headers. Alias selection - Default to alias **customer_db** for any question about the user’s account, points, or purchase history. - Use alias **merchant_db** ONLY when the user wants to browse or ask about items/products (catalog browsing). Allowed tables (customer role) - In **customer_db**: you may query ONLY these tables: `users`, `purchase_history`, `points_history`. - In **merchant_db**: you may query ONLY the `items` table. - If you attempt a table outside these lists, adjust your plan to an allowed table and try again. Tool usage - Prefer `<alias>.sql.query` for answers. Call `<alias>.sql.schema` once if you need to confirm the exact column names. - Do NOT use `sql.peek` for customer questions. - Discovery tools (db.aliases/types/names) are unnecessary; you already know which aliases to use for this role. SQL rules - Use **read-only SELECT** statements with **named parameters** (e.g., `:user_id`, `:limit`). - Keep results small. Always include a limit (LIMIT / TOP / ROWNUM depending on dialect). - For personal data in **customer_db**, ALWAYS include a `WHERE user_id = :user_id` filter. - For **merchant_db.items**, no user filter is required unless specified (e.g., `WHERE is_active = 1`). Examples - “What is my current total amount of points?” → alias=customer_db; query points from `points_history` with `WHERE user_id = :user_id`. Example (generic): SELECT SUM(points) AS total_points FROM points_history WHERE user_id = :user_id; - “Show my last 5 purchases.” → alias=customer_db; query `purchase_history` filtered by user and ordered by recency. Example: SELECT purchase_id, item_id, total_price, purchase_date FROM purchase_history WHERE user_id = :user_id ORDER BY purchase_date DESC LIMIT :limit; (set :limit = 5) - “List available items.” → alias=merchant_db; query `items` and return a concise list with name/price/availability. Example: SELECT item_id, name, price, availability_status FROM items WHERE is_active = 1 ORDER BY name ASC LIMIT :limit; (e.g., :limit = 10) Error handling - If a call fails with a policy/permission error, switch to the allowed alias/table and add required filters (e.g., `user_id = :user_id`), then retry. Response style - Return concise answers with the computed values (e.g., the total points number) and a short summary. Avoid exposing raw SQL unless the user asks for it. """.strip() with agents_client: agent = agents_client.create_agent( model=MODEL_DEPLOYMENT_NAME, name="mcp-sql-agent", instructions=instructions, tools=FUNCTIONS.definitions, ) print(f"Agent created: {agent.id}") thread = agents_client.threads.create() print(f"Thread created: {thread.id}") try: while True: prompt = input("\nAsk something (or 'quit'/'q'): ").strip() if prompt.lower() in ("quit", "q", "exit"): break agents_client.messages.create(thread_id=thread.id, role="user", content=prompt) run = agents_client.runs.create(thread_id=thread.id, agent_id=agent.id) run = poll_until_terminal(agents_client, thread.id, run.id) # Show conversation as simple alternating blocks try: msgs = agents_client.messages.list(thread_id=thread.id, order=ListSortOrder.ASCENDING) print("\nConversation:") print("=" * 80) for m in msgs: if m.text_messages: for tm in m.text_messages: print(f"{m.role.upper()}: {tm.text.value}\n") print("=" * 80) except Exception as e: print("⚠️ Could not list messages:", e) finally: # Cleanup: delete agent and close MCP session try: agents_client.delete_agent(agent.id) print(f"Deleted agent: {agent.id}") except Exception: pass try: mcp.close_session() print("Closed MCP session.") except Exception: pass if __name__ == "__main__": main() ``` client/client.py: ``` # client.py import os import json import time import requests from typing import Any, Dict, Optional, Set, Callable, Tuple from dotenv import load_dotenv from azure.identity import DefaultAzureCredential from azure.ai.agents import AgentsClient from azure.ai.agents.models import ( FunctionTool, SubmitToolOutputsAction, ToolOutput, RequiredFunctionToolCall, ListSortOrder, ) # ========== Load env ========== load_dotenv() PROJECT_ENDPOINT = os.environ["PROJECT_ENDPOINT"] MODEL_DEPLOYMENT_NAME = os.environ["MODEL_DEPLOYMENT_NAME"] MCP_SERVER_URL = os.environ["MCP_SERVER_URL"].rstrip("/") # Verbose logs (optional) os.environ.setdefault("AZURE_LOG_LEVEL", "warning") # ========== Railway DB login (role + user_id) ========== def railway_login() -> Tuple[str, str]: """ Returns (role, user_id) for the current user by querying your Railway DB. You can set credentials for MySQL or Postgres via env vars. Defaults: - table: users - columns: username, password, role, user_id Prompts at runtime for username/password. """ # Prompt user username = input("Login username: ").strip() password = input("Login password: ").strip() # Config (override via env if your schema differs) table = os.environ.get("AUTH_TABLE", "users") col_user = os.environ.get("AUTH_USER_COL", "username") col_pass = os.environ.get("AUTH_PASS_COL", "password") col_role = os.environ.get("AUTH_ROLE_COL", "role") col_userid = os.environ.get("AUTH_ID_COL", "user_id") # Determine DB type from env (mysql | pg) dialect = (os.environ.get("DB_PROVIDER") or os.environ.get("DB_DIALECT") or "").lower() if not dialect: # fallback: auto if MYSQL_HOST present -> mysql, elif PG_HOST -> pg dialect = "mysql" if os.environ.get("MYSQL_HOST") else ("pg" if os.environ.get("PG_HOST") else "") if dialect not in ("mysql", "pg"): print("[login] No DB_PROVIDER set (mysql|pg). Using default role='admin', user_id='test_user'.") return ("admin", "test_user") try: if dialect == "mysql": import mysql.connector # mysql-connector-python conn = mysql.connector.connect( host=os.environ["MYSQL_HOST"], port=int(os.environ.get("MYSQL_PORT", "3306")), user=os.environ["MYSQL_USER"], password=os.environ["MYSQL_PASSWORD"], database=os.environ["MYSQL_DB"], ) # NOTE: In production use hashed passwords; this demo assumes plain text sql = f""" SELECT {col_role}, {col_userid} FROM {table} WHERE {col_user} = %s AND {col_pass} = %s LIMIT 1 """ with conn.cursor() as cur: cur.execute(sql, (username, password)) row = cur.fetchone() conn.close() else: # pg import psycopg2 # psycopg2-binary conn = psycopg2.connect( host=os.environ["PG_HOST"], port=int(os.environ.get("PG_PORT", "5432")), user=os.environ["PG_USER"], password=os.environ["PG_PASSWORD"], dbname=os.environ["PG_DB"], ) sql = f""" SELECT {col_role}, {col_userid} FROM {table} WHERE {col_user} = %s AND {col_pass} = %s LIMIT 1 """ with conn.cursor() as cur: cur.execute(sql, (username, password)) row = cur.fetchone() conn.close() if not row: print("[login] Invalid credentials. Defaulting to role='customer' user_id='1' for demo.") return ("customer", "1") role, user_id = str(row[0]), str(row[1]) print(f"[login] Authenticated. role={role} user_id={user_id}") return (role, user_id) except Exception as ex: print(f"[login] DB error ({dialect}), defaulting to admin/test_user: {ex}") return ("admin", "test_user") # ========== Minimal MCP HTTP client (same flow as your toolList.py) ========== class McpHttpClient: def __init__(self, url: str): self.url = url.rstrip("/") self.sid: Optional[str] = None self.headers: Dict[str, str] = { "Content-Type": "application/json", "Accept": "application/json, text/event-stream", # x-role / x-user-id set after login } def update_identity(self, role: str, user_id: str): """Update identity headers; call before initialize()""" self.headers["x-role"] = role self.headers["x-user-id"] = user_id def _post(self, payload: Dict[str, Any]) -> requests.Response: return requests.post(self.url, headers=self.headers, data=json.dumps(payload), timeout=60) @staticmethod def _parse_response(text: str) -> Dict[str, Any]: t = text.strip() if t.startswith("event:"): lines = t.splitlines() data_lines = [ln for ln in lines if ln.startswith("data:")] if not data_lines: raise ValueError(f"No 'data:' block in SSE: {t[:200]}...") payload = data_lines[-1][len("data: "):] return json.loads(payload) return json.loads(t) def initialize(self): payload = { "jsonrpc": "2.0", "id": "1", "method": "initialize", "params": { "protocolVersion": "2025-03-26", "clientInfo": {"name": "agents-bridge-client", "version": "1.0.0"}, "capabilities": {"roots": {"listChanged": True}, "sampling": {}, "tools": {}} } } r = self._post(payload) r.raise_for_status() sid = r.headers.get("mcp-session-id") if not sid: raise RuntimeError("MCP server did not return mcp-session-id in headers.") self.sid = sid def ready(self): assert self.sid, "Call initialize() first" self.headers["mcp-session-id"] = self.sid payload = {"jsonrpc": "2.0", "method": "notifications/initialized"} self._post(payload) # ignore body def tools_call(self, name: str, arguments: Optional[Dict[str, Any]] = None) -> str: """ Call an MCP tool and return a text payload suitable for Agent ToolOutput. We coerce MCP results (content=[{type:'json'|'text'}]) into a single string. """ assert self.sid, "Call initialize() first" args = arguments if arguments is not None else {} payload = { "jsonrpc": "2.0", "id": "call-1", "method": "tools/call", "params": {"name": name, "arguments": args} } r = self._post(payload) r.raise_for_status() obj = self._parse_response(r.text) result = obj.get("result") or {} content = result.get("content") or [] if not content: return "[]" item = content[0] ctype = item.get("type") if ctype == "text": return item.get("text", "") if ctype == "json": try: return json.dumps(item.get("json"), ensure_ascii=False) except Exception: return str(item.get("json")) return json.dumps(obj, ensure_ascii=False) _mcp = McpHttpClient(MCP_SERVER_URL) _mcp_initialized = False def _ensure_mcp_session(): global _mcp_initialized if not _mcp_initialized: _mcp.initialize() _mcp.ready() _mcp_initialized = True # ========== Function tools (generalized) ========== def db_aliases() -> str: """ Return list of available database aliases as a JSON string. :return: JSON string array of aliases. """ _ensure_mcp_session() return _mcp.tools_call("db.aliases", {}) def db_types() -> str: """ Return list of available database dialects as a JSON string. :return: JSON string array (e.g., ["mysql","pg","mssql","oracle","sqlite"]) """ _ensure_mcp_session() return _mcp.tools_call("db.types", {}) def db_names() -> str: """ Return list of database names (not aliases) as a JSON string. :return: JSON string array of names. """ _ensure_mcp_session() return _mcp.tools_call("db.names", {}) def db_list_by_type(type: str, unique: bool = True, includeAliases: bool = False) -> str: """ List databases for a given dialect. :param type: One of mysql | pg | mssql | oracle | sqlite :param unique: If true, unique names; else one row per alias. :param includeAliases: If true, include alias along with name. :return: JSON string array (names or objects with alias+name). """ _ensure_mcp_session() args = {"type": type, "unique": unique, "includeAliases": includeAliases} return _mcp.tools_call("db.listByType", args) def sql_schema(alias: str) -> str: """ Return a compact Markdown outline of tables and columns for the given alias. :param alias: Database alias (e.g., "customer_db", "merchant_db") :return: Markdown string. """ _ensure_mcp_session() return _mcp.tools_call(f"{alias}.sql.schema", {}) def sql_peek(alias: str, maxRowsPerTable: int = 50, as_: str = "markdown") -> str: """ Peek into content for the given alias. :param alias: Database alias :param maxRowsPerTable: 1..10000 :param as_: "markdown" | "json" :return: Markdown or JSON text (stringified). """ _ensure_mcp_session() args = {"maxRowsPerTable": maxRowsPerTable, "as": as_} return _mcp.tools_call(f"{alias}.sql.peek", args) def sql_query(alias: str, sql: str, params: Optional[dict] = None, readOnly: bool = True, rowLimit: int = 1000, as_: str = "json") -> str: """ Execute a parameterized SQL query against the given alias. :param alias: Database alias :param sql: SELECT query string :param params: Named parameters dict :param readOnly: Only SELECT when true :param rowLimit: Max rows returned :param as_: "json" | "markdown" :return: JSON or Markdown text (stringified) """ _ensure_mcp_session() args = {"sql": sql, "params": params or {}, "readOnly": readOnly, "rowLimit": rowLimit, "as": as_} return _mcp.tools_call(f"{alias}.sql.query", args) # ========== Build FunctionTool set ========== USER_FUNCTIONS: Set[Callable[..., Any]] = { db_aliases, db_types, db_names, db_list_by_type, sql_schema, sql_peek, sql_query, } FUNCTIONS = FunctionTool(functions=USER_FUNCTIONS) # Agent can call these tools # ========== Run helpers ========== TERMINAL = {"completed", "failed", "expired", "cancelled"} def normalize_status(run) -> str: s = getattr(run, "status", None) if s is None: return "" for attr in ("value", "name"): if hasattr(s, attr): try: return str(getattr(s, attr)).lower() except Exception: pass return str(s).lower() def poll_until_terminal(client: AgentsClient, thread_id: str, run_id: str, interval: float = 1.0): last_status = None while True: run = client.runs.get(thread_id=thread_id, run_id=run_id) status = normalize_status(run) if status != last_status: print(f"[debug] run status -> {status}") last_status = status if status in TERMINAL: return run if "requires_action" in status and isinstance(getattr(run, "required_action", None), SubmitToolOutputsAction): tool_calls = run.required_action.submit_tool_outputs.tool_calls outputs = [] for tc in tool_calls: print(f"[debug] tool_call: name={getattr(tc,'name','?')} args={getattr(tc,'arguments',{})}") if isinstance(tc, RequiredFunctionToolCall): try: out = FUNCTIONS.execute(tc) # bridges to MCP HTTP except Exception as ex: out = f"ERROR executing function '{getattr(tc,'name','?')}': {ex}" outputs.append(ToolOutput(tool_call_id=tc.id, output=out)) if outputs: client.runs.submit_tool_outputs(thread_id=thread_id, run_id=run_id, tool_outputs=outputs) time.sleep(interval) # ========== Main ========== def main(): # 1) Login (Railway DB) -> get role + user_id, bind to MCP headers role, user_id = railway_login() _mcp.update_identity(role, user_id) # must be before initialize _ensure_mcp_session() # session created using this identity # 2) Azure Agents client agents_client = AgentsClient( endpoint=PROJECT_ENDPOINT, credential=DefaultAzureCredential( exclude_environment_credential=True, exclude_managed_identity_credential=True, ), ) # 3) Create agent with generalized function tools with agents_client: # agent = agents_client.create_agent( # model=MODEL_DEPLOYMENT_NAME, # name="sql-mcp-bridge-agent", # instructions=( # "You can use the provided tools to answer questions.\n" # "- Use db_aliases/db_types/db_names/db_list_by_type to discover databases.\n" # "- When inspecting or querying a specific database, call sql_schema/peek/query and " # "pass the alias argument (e.g., alias='customer_db').\n" # "- If a tool returns JSON text, summarize as needed." # ), # tools=FUNCTIONS.definitions, # ) context_instructions = f""" You are assisting a signed-in user. - role: {role} - user_id: {user_id} Rules: - When the user says "my", treat it as user_id={user_id}. - Do NOT ask the user which user they are; you already know. - If role is "customer", default to alias "customer_db" unless the user explicitly selects another allowed alias. - If role is "merchant", default to alias "merchant_db". - Only call discovery tools (db_aliases, db_types, db_names, db_list_by_type) if role is "admin". - Prefer SELECT statements with named parameters. Keep results small. """ agent = agents_client.create_agent( model=MODEL_DEPLOYMENT_NAME, name="sql-mcp-agent", instructions=context_instructions.strip(), tools=FUNCTIONS.definitions, ) print(f"Agent created: {agent.id}") thread = agents_client.threads.create() print(f"Thread created: {thread.id}") while True: prompt = input("\nAsk something (or 'quit'): ").strip() if prompt.lower() in ("quit", "q", "exit"): break agents_client.messages.create(thread_id=thread.id, role="user", content=prompt) run = agents_client.runs.create(thread_id=thread.id, agent_id=agent.id) run = poll_until_terminal(agents_client, thread.id, run.id) print(f"Run status: {normalize_status(run)}") # Show conversation try: count = 0 msgs = agents_client.messages.list(thread_id=thread.id, order=ListSortOrder.ASCENDING) print("\nConversation:") print("=" * 80) for m in msgs: if m.text_messages: for tm in m.text_messages: if count == 0: print(f"\n{m.role.upper()}: {tm.text.value}") count = 1 elif count == 1: print(f"{m.role.upper()}: {tm.text.value}\n") count = 0 print("=" * 80) except Exception as e: print("⚠️ Could not list messages:", e) # Optional cleanup try: agents_client.delete_agent(agent.id) except Exception: pass if __name__ == "__main__": main() ``` client/toolList.py ``` # client.py import os import json import time import requests from typing import Any, Dict, Optional, Set, Callable, Tuple from dotenv import load_dotenv from azure.identity import DefaultAzureCredential from azure.ai.agents import AgentsClient from azure.ai.agents.models import ( FunctionTool, SubmitToolOutputsAction, ToolOutput, RequiredFunctionToolCall, ListSortOrder, ) # ========== Load env ========== load_dotenv() PROJECT_ENDPOINT = os.environ["PROJECT_ENDPOINT"] MODEL_DEPLOYMENT_NAME = os.environ["MODEL_DEPLOYMENT_NAME"] MCP_SERVER_URL = os.environ["MCP_SERVER_URL"].rstrip("/") # Verbose logs (optional) os.environ.setdefault("AZURE_LOG_LEVEL", "warning") # ========== Railway DB login (role + user_id) ========== def railway_login() -> Tuple[str, str]: """ Returns (role, user_id) for the current user by querying your Railway DB. You can set credentials for MySQL or Postgres via env vars. Defaults: - table: users - columns: username, password, role, user_id Prompts at runtime for username/password. """ # Prompt user username = input("Login username: ").strip() password = input("Login password: ").strip() # Config (override via env if your schema differs) table = os.environ.get("AUTH_TABLE", "users") col_user = os.environ.get("AUTH_USER_COL", "username") col_pass = os.environ.get("AUTH_PASS_COL", "password") col_role = os.environ.get("AUTH_ROLE_COL", "role") col_userid = os.environ.get("AUTH_ID_COL", "user_id") # Determine DB type from env (mysql \n pg) dialect = (os.environ.get("DB_PROVIDER") or os.environ.get("DB_DIALECT") or "").lower() if not dialect: # fallback: auto if MYSQL_HOST present -> mysql, elif PG_HOST -> pg dialect = "mysql" if os.environ.get("MYSQL_HOST") else ("pg" if os.environ.get("PG_HOST") else "") if dialect not in ("mysql", "pg"): print("[login] No DB_PROVIDER set (mysql\\pg). Using default role='admin', user_id='test_user'.") return ("admin", "test_user") try: if dialect == "mysql": import mysql.connector # mysql-connector-python conn = mysql.connector.connect( host=os.environ["MYSQL_HOST"], port=int(os.environ.get("MYSQL_PORT", "3306")), user=os.environ["MYSQL_USER"], password=os.environ["MYSQL_PASSWORD"], database=os.environ["MYSQL_DB"], ) # NOTE: In production use hashed passwords; this demo assumes plain text sql = f""" SELECT {col_role}, {col_userid} FROM {table} WHERE {col_user} = %s AND {col_pass} = %s LIMIT 1 """ with conn.cursor() as cur: cur.execute(sql, (username, password)) row = cur.fetchone() conn.close() else: # pg import psycopg2 # psycopg2-binary conn = psycopg2.connect( host=os.environ["PG_HOST"], port=int(os.environ.get("PG_PORT", "5432")), user=os.environ["PG_USER"], password=os.environ["PG_PASSWORD"], dbname=os.environ["PG_DB"], ) sql = f""" SELECT {col_role}, {col_userid} FROM {table} WHERE {col_user} = %s AND {col_pass} = %s LIMIT 1 """ with conn.cursor() as cur: cur.execute(sql, (username, password)) row = cur.fetchone() conn.close() if not row: print("[login] Invalid credentials. Defaulting to role='customer' user_id='1' for demo.") return ("customer", "1") role, user_id = str(row[0]), str(row[1]) print(f"[login] Authenticated. role={role} user_id={user_id}") return (role, user_id) except Exception as ex: print(f"[login] DB error ({dialect}), defaulting to admin/test_user: {ex}") return ("admin", "test_user") # ========== Minimal MCP HTTP client (same flow as your toolList.py) ========== class McpHttpClient: def __init__(self, url: str): self.url = url.rstrip("/") self.sid: Optional[str] = None self.headers: Dict[str, str] = { "Content-Type": "application/json", "Accept": "application/json, text/event-stream", # x-role / x-user-id set after login } def update_identity(self, role: str, user_id: str): """Update identity headers; call before initialize()""" self.headers["x-role"] = role self.headers["x-user-id"] = user_id def _post(self, payload: Dict[str, Any]) -> requests.Response: return requests.post(self.url, headers=self.headers, data=json.dumps(payload), timeout=60) @staticmethod def _parse_response(text: str) -> Dict[str, Any]: t = text.strip() if t.startswith("event:"): lines = t.splitlines() data_lines = [ln for ln in lines if ln.startswith("data:")] if not data_lines: raise ValueError(f"No 'data:' block in SSE: {t[:200]}...") payload = data_lines[-1][len("data: "):] return json.loads(payload) return json.loads(t) def initialize(self): payload = { "jsonrpc": "2.0", "id": "1", "method": "initialize", "params": { "protocolVersion": "2025-03-26", "clientInfo": {"name": "agents-bridge-client", "version": "1.0.0"}, "capabilities": {"roots": {"listChanged": True}, "sampling": {}, "tools": {}} } } r = self._post(payload) r.raise_for_status() sid = r.headers.get("mcp-session-id") if not sid: raise RuntimeError("MCP server did not return mcp-session-id in headers.") self.sid = sid def ready(self): assert self.sid, "Call initialize() first" self.headers["mcp-session-id"] = self.sid payload = {"jsonrpc": "2.0", "method": "notifications/initialized"} self._post(payload) # ignore body def tools_call(self, name: str, arguments: Optional[Dict[str, Any]] = None) -> str: """ Call an MCP tool and return a text payload suitable for Agent ToolOutput. We coerce MCP results (content=[{type:'json'|'text'}]) into a single string. """ assert self.sid, "Call initialize() first" args = arguments if arguments is not None else {} payload = { "jsonrpc": "2.0", "id": "call-1", "method": "tools/call", "params": {"name": name, "arguments": args} } r = self._post(payload) r.raise_for_status() obj = self._parse_response(r.text) result = obj.get("result") or {} content = result.get("content") or [] if not content: return "[]" item = content[0] ctype = item.get("type") if ctype == "text": return item.get("text", "") if ctype == "json": try: return json.dumps(item.get("json"), ensure_ascii=False) except Exception: return str(item.get("json")) return json.dumps(obj, ensure_ascii=False) _mcp = McpHttpClient(MCP_SERVER_URL) _mcp_initialized = False def _ensure_mcp_session(): global _mcp_initialized if not _mcp_initialized: _mcp.initialize() _mcp.ready() _mcp_initialized = True # ========== Function tools (generalized) ========== def db_aliases() -> str: """Return list of available database aliases as a JSON string.""" _ensure_mcp_session() return _mcp.tools_call("db.aliases", {}) def db_types() -> str: """Return list of available database dialects as a JSON string.""" _ensure_mcp_session() return _mcp.tools_call("db.types", {}) def db_names() -> str: """Return list of database names (not aliases) as a JSON string.""" _ensure_mcp_session() return _mcp.tools_call("db.names", {}) def db_list_by_type(type: str, unique: bool = True, includeAliases: bool = False) -> str: """List databases for a given dialect.""" _ensure_mcp_session() args = {"type": type, "unique": unique, "includeAliases": includeAliases} return _mcp.tools_call("db.listByType", args) def sql_schema(alias: str) -> str: """Return a compact Markdown outline of tables and columns for the given alias.""" _ensure_mcp_session() return _mcp.tools_call(f"{alias}.sql.schema", {}) def sql_peek(alias: str, maxRowsPerTable: int = 50, as_: str = "markdown") -> str: """Peek into content for the given alias.""" _ensure_mcp_session() args = {"maxRowsPerTable": maxRowsPerTable, "as": as_} return _mcp.tools_call(f"{alias}.sql.peek", args) def sql_query(alias: str, sql: str, params: Optional[dict] = None, readOnly: bool = True, rowLimit: int = 1000, as_: str = "json") -> str: """Execute a parameterized SQL query against the given alias.""" _ensure_mcp_session() args = {"sql": sql, "params": params or {}, "readOnly": readOnly, "rowLimit": rowLimit, "as": as_} return _mcp.tools_call(f"{alias}.sql.query", args) # ========== Build FunctionTool set ========== USER_FUNCTIONS: Set[Callable[..., Any]] = { db_aliases, db_types, db_names, db_list_by_type, sql_schema, sql_peek, sql_query, } FUNCTIONS = FunctionTool(functions=USER_FUNCTIONS) # Agent can call these tools # ========== Run helpers ========== TERMINAL = {"completed", "failed", "expired", "cancelled"} def normalize_status(run) -> str: s = getattr(run, "status", None) if s is None: return "" for attr in ("value", "name"): if hasattr(s, attr): try: return str(getattr(s, attr)).lower() except Exception: pass return str(s).lower() def poll_until_terminal(client: AgentsClient, thread_id: str, run_id: str, interval: float = 1.0): last_status = None while True: run = client.runs.get(thread_id=thread_id, run_id=run_id) status = normalize_status(run) if status != last_status: print(f"[debug] run status -> {status}") last_status = status if status in TERMINAL: return run if "requires_action" in status and isinstance(getattr(run, "required_action", None), SubmitToolOutputsAction): tool_calls = run.required_action.submit_tool_outputs.tool_calls outputs = [] for tc in tool_calls: print(f"[debug] tool_call: name={getattr(tc,'name','?')} args={getattr(tc,'arguments',{})}") if isinstance(tc, RequiredFunctionToolCall): try: out = FUNCTIONS.execute(tc) # bridges to MCP HTTP except Exception as ex: out = f"ERROR executing function '{getattr(tc,'name','?')}': {ex}" outputs.append(ToolOutput(tool_call_id=tc.id, output=out)) if outputs: client.runs.submit_tool_outputs(thread_id=thread_id, run_id=run_id, tool_outputs=outputs) time.sleep(interval) # ========== Main ========== def main(): # 1) Login (Railway DB) -> get role + user_id, bind to MCP headers role, user_id = railway_login() _mcp.update_identity(role, user_id) # must be before initialize _ensure_mcp_session() # session created using this identity # 2) Discover aliases and pick a default alias for this session (tiny addition) try: aliases = json.loads(db_aliases()) except Exception: aliases = [] # Prefer role-specific alias if present; else first available alias default_alias = None role_l = (role or "").lower() if role_l.startswith("customer") and "customer_db" in aliases: default_alias = "customer_db" elif role_l.startswith("merchant") and "merchant_db" in aliases: default_alias = "merchant_db" elif aliases: default_alias = aliases[0] # 3) Get a compact schema preview for the default alias and inject into instructions schema_preview = "" if default_alias: try: schema_preview = sql_schema(default_alias) # keep the preview short to avoid flooding context (adjust as needed) schema_preview = schema_preview[:4000] except Exception: schema_preview = "" # 4) Identity-aware instructions (small change from your original) agent_instructions = ( "You can use the provided tools to answer questions.\n" f"- Signed-in identity: role={role}, user_id={user_id}.\n" f"- Default database alias to use when not specified: {default_alias}.\n" "- Do NOT ask for credentials; the user is already authenticated.\n" "- When the user says \"my ...\", interpret it with this identity (user_id above).\n" "- Use db_aliases/db_types/db_names/db_list_by_type to discover databases if needed.\n" "- When inspecting or querying a specific database, call sql_schema/peek/query and " "pass the alias argument (use the default alias unless the user specifies another).\n" "- If a tool returns JSON text, summarize as needed.\n" "\n" "### Schema overview (default alias)\n" f"{schema_preview}\n" ) # 5) Azure Agents client agents_client = AgentsClient( endpoint=PROJECT_ENDPOINT, credential=DefaultAzureCredential( exclude_environment_credential=True, exclude_managed_identity_credential=True, ), ) # 6) Create agent with generalized function tools + identity-aware instructions with agents_client: agent = agents_client.create_agent( model=MODEL_DEPLOYMENT_NAME, name="sql-mcp-bridge-agent", instructions=agent_instructions, tools=FUNCTIONS.definitions, ) print(f"Agent created: {agent.id}") thread = agents_client.threads.create() print(f"Thread created: {thread.id}") while True: prompt = input("\nAsk something (or 'quit'): ").strip() if prompt.lower() in ("quit", "q", "exit"): break agents_client.messages.create(thread_id=thread.id, role="user", content=prompt) run = agents_client.runs.create(thread_id=thread.id, agent_id=agent.id) run = poll_until_terminal(agents_client, thread.id, run.id) print(f"Run status: {normalize_status(run)}") # Show conversation try: msgs = agents_client.messages.list(thread_id=thread.id, order=ListSortOrder.ASCENDING) print("\nConversation:") print("-" * 60) for m in msgs: if m.text_messages: for tm in m.text_messages: print(f"{m.role.upper()}: {tm.text.value}") print("-" * 60) except Exception as e: print("⚠️ Could not list messages:", e) # Optional cleanup try: agents_client.delete_agent(agent.id) except Exception: pass if __name__ == "__main__": main() ``` src/db/index.ts ``` /* eslint-disable @typescript-eslint/no-explicit-any */ /** * src/db/index.ts * * Minimal, env-driven DB provider selector. * - Resolves from DB_PROVIDER or DB_DIALECT (or DATABASE_URL scheme), defaulting to sqlite. * - Normalizes synonyms (postgres/postgresql -> pg, mariadb -> mysql, sqlserver -> mssql, sqlite3 -> sqlite). * - Prefers factory exports (newDb/createDb/default()) else falls back to singletons (pgDb/mysqlDb/...). */ import type { DB } from "./provider.js"; type CanonicalDialect = "pg" | "mysql" | "mssql" | "oracle"; const DIALECT_SYNONYMS: Record<string, CanonicalDialect> = { // Postgres pg: "pg", postgres: "pg", postgresql: "pg", psql: "pg", // MySQL mysql: "mysql", mariadb: "mysql", maria: "mysql", // SQL Server mssql: "mssql", "ms-sql": "mssql", sqlserver: "mssql", "sql-server": "mssql", // Oracle oracle: "oracle", oracledb: "oracle", oci: "oracle", }; function canonicalizeDialect(input?: string | null): CanonicalDialect | undefined { if (!input) return undefined; const key = String(input).trim().toLowerCase(); return DIALECT_SYNONYMS[key]; } function dialectFromDatabaseUrl(url?: string): CanonicalDialect | undefined { if (!url) return undefined; try { const u = new URL(url); const proto = u.protocol.replace(":", "").toLowerCase(); // Map only to supported dialects via synonyms (pg/mysql/mssql/oracle) return DIALECT_SYNONYMS[proto]; } catch { // Non-URL strings (JDBC-ish, etc.) - we no longer guess "sqlite"; return undefined return undefined; } } function resolveDialectFromEnv(env = process.env): CanonicalDialect { const fromProvider = canonicalizeDialect(env.DB_PROVIDER); if (fromProvider) return fromProvider; const fromDialect = canonicalizeDialect(env.DB_DIALECT); if (fromDialect) return fromDialect; const fromUrl = dialectFromDatabaseUrl(env.DATABASE_URL); if (fromUrl) return fromUrl; throw new Error( "Unable to resolve DB dialect from env. " + "Please set DB_PROVIDER/DB_DIALECT/DATABASE_URL for a supported dialect (pg/mysql/mssql/oracle)." ) } /** Attach canonical dialect hint on the db object. */ function annotateDialect<T extends object>(db: T, dialect: CanonicalDialect): T & { dialect: CanonicalDialect } { if (!db) return { dialect } as any; if ((db as any).dialect !== dialect) { try { Object.defineProperty(db as any, "dialect", { value: dialect, enumerable: true }); } catch { (db as any).dialect = dialect; } } return db as any; } /** Prefer factory if available, else fall back to well-known singleton names. */ function materializeDb(mod: any, dialect: CanonicalDialect): DB { // 1) Factories (preferred) if (typeof mod?.newDb === "function") { const db = mod.newDb(); return annotateDialect(db, dialect); } if (typeof mod?.createDb === "function") { const db = mod.createDb(); return annotateDialect(db, dialect); } if (typeof mod?.default === "function") { const db = mod.default(); return annotateDialect(db, dialect); } // 2) default export already a db object? if (mod?.default && typeof mod.default === "object" && typeof mod.default.query === "function") { return annotateDialect(mod.default, dialect); } // 3) Well-known singleton names (your current exports) const knownSingletons: Record<CanonicalDialect, string[]> = { pg: ["pgDb", "db"], mysql: ["mysqlDb", "db"], mssql: ["mssqlDb", "db"], oracle: ["oracleDb", "db"], }; for (const key of knownSingletons[dialect]) { const val = mod?.[key]; if (val && typeof val.query === "function") { return annotateDialect(val, dialect); } } // 4) Heuristic: any object with query() for (const key of Object.keys(mod ?? {})) { const val = mod[key]; if (val && typeof val === "object" && typeof val.query === "function") { return annotateDialect(val, dialect); } } throw new Error( `Provider module for '${dialect}' does not expose a usable DB export. ` + `Expected a factory (newDb/createDb/default()) or a singleton (e.g., ${dialect}Db). ` + `Exports: [${Object.keys(mod ?? {}).join(", ")}]` ); } /** Load the provider module for a given canonical dialect. */ async function loadModule(dialect: CanonicalDialect): Promise<any> { switch (dialect) { case "pg": return import("./providers/postgres.js"); case "mysql": return import("./providers/mysql.js"); case "mssql": return import("./providers/mssql.js"); case "oracle": return import("./providers/oracle.js"); default: // This should be unreachable due to the CanonicalDialect union, // but we keep a defensive guard for future edits. throw new Error(`Unsupported dialect: ${dialect}`); } } /** * Public API: get a DB instance based on current env. * - Imports provider AFTER env resolution. * - Uses factory if present; otherwise singleton. */ export async function getDb(): Promise<DB> { const dialect = resolveDialectFromEnv(process.env); const mod = await loadModule(dialect); const db = materializeDb(mod, dialect); return db as DB; } /** Optional helper (e.g., for X-DB-Dialect header). */ export function getResolvedDialect(): CanonicalDialect { return resolveDialectFromEnv(process.env); } ``` src/db/provider.ts ``` export type Dialect = "sqlite" | "pg" | "mysql" | "mssql" | "oracle"; export interface DB { dialect: Dialect; /** * Execute a parameterized query. * @param text SQL with placeholders appropriate for the driver * @param params Parameter values (array or named object based on driver) */ query<T = unknown>(text: string, params: any): Promise<{ rows: T[]; rowCount: number }>; close?(): Promise<void> | void; } ``` src/db/registry.ts ``` // src/db/registry.ts import fs from "node:fs"; import * as yaml from "js-yaml"; import { getDb } from "./index.js"; import type { DB, Dialect } from "./provider.js"; /** * NOTE: This version adds: * - ${ENV} and ${ENV:default} expansion for all string fields in dbs.yaml * - "enabled: false" support to skip entries explicitly * - Graceful skip of entries whose required envs are missing/blank * - Light type coercion (e.g., port -> number) */ export type DbEntry = | ({ alias: string; enabled?: boolean; dialect: "mssql"; host: string; port?: number; user: string; password: string; database: string; options?: Record<string, any>; }) | ({ alias: string; enabled?: boolean; dialect: "mysql"; host: string; port?: number; user: string; password: string; database: string; }) | ({ alias: string; enabled?: boolean; dialect: "pg"; host: string; port?: number; user: string; password: string; database: string; }) | ({ alias: string; enabled?: boolean; dialect: "oracle"; connectString: string; user: string; password: string; }) | ({ alias: string; enabled?: boolean; dialect: "sqlite"; file: string; }); export interface DbConfigFile { databases: DbEntry[]; } export interface DbAliasMeta { alias: string; dialect: Dialect; // "mysql" | "pg" | "mssql" | "oracle" | "sqlite" databaseName: string; // what you want to show on /dbs host?: string; port?: number; connectString?: string; file?: string; } /** ------------------------------------------------------------------ */ /** ENV EXPANSION HELPERS: ${NAME} or ${NAME:default} in YAML strings. */ /** ------------------------------------------------------------------ */ function expandEnvInString(str: string): string { // Replace ${VAR} or ${VAR:default} return str.replace(/\$\{([A-Z0-9_]+)(?::([^}]*))?\}/gi, (_m, name: string, def?: string) => { const v = process.env[name]; if (v === undefined || v === "") { // If no value and default provided -> use default; otherwise keep empty (so we can "skip" later). return def ?? ""; } return v; }); } function deepExpand<T>(obj: T): T { if (obj == null) return obj; if (typeof obj === "string") return expandEnvInString(obj) as unknown as T; if (Array.isArray(obj)) return obj.map(deepExpand) as unknown as T; if (typeof obj === "object") { const out: any = {}; for (const [k, v] of Object.entries(obj as any)) out[k] = deepExpand(v); return out; } return obj; } /** Coerce common field types (e.g., port string -> number). */ function coerceTypesInPlace(entry: any) { if (entry?.port != null && typeof entry.port === "string") { const n = Number(entry.port); if (Number.isFinite(n)) entry.port = n; } return entry; } function isNonEmptyString(x: unknown): x is string { return typeof x === "string" && x.trim().length > 0; } /** Figure out missing required keys per dialect for a given entry. */ function getMissingKeys(entry: any): string[] { switch (entry?.dialect) { case "mssql": { const req = ["alias", "dialect", "host", "user", "password", "database"]; return req.filter((k) => !isNonEmptyString(entry[k])); } case "mysql": { const req = ["alias", "dialect", "host", "user", "password", "database"]; return req.filter((k) => !isNonEmptyString(entry[k])); } case "pg": { const req = ["alias", "dialect", "host", "user", "password", "database"]; return req.filter((k) => !isNonEmptyString(entry[k])); } case "oracle": { const req = ["alias", "dialect", "connectString", "user", "password"]; return req.filter((k) => !isNonEmptyString(entry[k])); } case "sqlite": { const req = ["alias", "dialect", "file"]; return req.filter((k) => !isNonEmptyString(entry[k])); } default: return ["dialect"]; } } /** ---------------------------------------------------------- */ /** Your existing helpers: clear DB env, patch, scoped getDb(). */ /** ---------------------------------------------------------- */ /** Hard-clear DB-related env before each alias to prevent bleed. */ function clearDbEnv(env = process.env) { const explicit = [ "DB_PROVIDER", "DB_DIALECT", "DATABASE_URL", "SQLITE_FILE", "SQLITE_PATH", ]; const patterns = [ /^PG/i, /^POSTGRES/i, /^MYSQL/i, /^MSSQL/i, /^SQLSERVER/i, /^ORACLE/i, /^ORACLE_DB/i, /^ORACLEDB/i, /^OCI/i, /^SQLITE/i, ]; for (const k of explicit) delete env[k]; for (const k of Object.keys(env)) { if (patterns.some((rx) => rx.test(k))) delete env[k]; } } function withEnv<T>(patch: Record<string, string>, fn: () => Promise<T>): Promise<T> { const prev: Record<string, string | undefined> = {}; for (const [k, v] of Object.entries(patch)) { prev[k] = process.env[k]; process.env[k] = v; } return fn().finally(() => { for (const [k, v] of Object.entries(prev)) { if (v === undefined) delete process.env[k]; else process.env[k] = v; } }); } function envPatchFor(entry: DbEntry): Record<string, string> { switch (entry.dialect) { case "mssql": { const host = entry.host; const port = String(entry.port ?? 1433); const user = entry.user; const password = entry.password; const database = entry.database; // Server=host,port;Database=db;User Id=user;Password=pass;Encrypt=true;TrustServerCertificate=true; const base = [ `Server=${host},${port}`, `Database=${database}`, `User Id=${user}`, `Password=${password}`, `Encrypt=true`, `TrustServerCertificate=true`, // OK for dev; for prod consider false with proper certs ].join(";") + ";"; const patch: Record<string, string> = { DB_PROVIDER: "mssql", DB_DIALECT: "mssql", DATABASE_URL: base, MSSQL_SERVER: host, MSSQL_HOST: host, MSSQL_PORT: port, MSSQL_USER: user, MSSQL_PASSWORD: password, MSSQL_DATABASE: database, }; if ((entry as any).options) { patch.MSSQL_OPTS_JSON = JSON.stringify((entry as any).options); } return patch; } case "mysql": { const host = entry.host; const port = String(entry.port ?? 3306); const user = encodeURIComponent(entry.user); const password = encodeURIComponent(entry.password); const database = entry.database; const url = `mysql://${user}:${password}@${host}:${port}/${database}`; return { DB_PROVIDER: "mysql", DB_DIALECT: "mysql", DATABASE_URL: url, MYSQL_HOST: host, MYSQL_PORT: port, MYSQL_USER: decodeURIComponent(user), MYSQL_PASSWORD: decodeURIComponent(password), MYSQL_DATABASE: database, }; } case "pg": { const host = entry.host; const port = String(entry.port ?? 5432); const user = encodeURIComponent(entry.user); const password = encodeURIComponent(entry.password); const database = entry.database; const url = `postgres://${user}:${password}@${host}:${port}/${database}`; return { DB_PROVIDER: "pg", DB_DIALECT: "pg", DATABASE_URL: url, PGHOST: host, PGPORT: port, PGUSER: decodeURIComponent(user), PGPASSWORD: decodeURIComponent(password), PGDATABASE: database, }; } case "oracle": { const user = entry.user; const password = entry.password; const connectString = entry.connectString; const url = `${user}/${password}@${connectString}`; return { DB_PROVIDER: "oracle", DB_DIALECT: "oracle", DATABASE_URL: url, ORACLE_CONNECT_STRING: connectString, ORACLE_USER: user, ORACLE_PASSWORD: password, }; } case "sqlite": { return { DB_PROVIDER: "sqlite", DB_DIALECT: "sqlite", SQLITE_FILE: entry.file, SQLITE_PATH: entry.file, }; } } } export async function loadDbRegistryFromYaml(path: string): Promise<{ registry: Map<string, DB>; meta: Map<string, DbAliasMeta>; closeAll: () => Promise<void>; }> { const raw = fs.readFileSync(path, "utf8"); // 1) Parse YAML const parsed = yaml.load(raw) as DbConfigFile; // 2) Expand ${ENV} placeholders across all strings const cfg = deepExpand(parsed) as DbConfigFile; const list = cfg?.databases ?? []; if (!list.length) throw new Error(`No databases in ${path}`); const registry = new Map<string, DB>(); const meta = new Map<string, DbAliasMeta>(); // small helper – works on Windows and POSIX const basename = (p?: string) => (p ?? "").split(/[\\/]/).filter(Boolean).pop() ?? "(sqlite)"; for (const rawEntry of list) { if ((rawEntry as any)?.enabled === false) { console.warn(`[db] Skipping '${(rawEntry as any).alias ?? "?"}' (enabled=false).`); continue; } // Coerce obvious scalar types before expansion const coerced = coerceTypesInPlace({ ...rawEntry }) as DbEntry; // Expand one YAML item into N concrete entries (lists -> variants) const variants = expandDbEntry(coerced); for (const entry of variants) { // Validate this concrete entry const missing = getMissingKeys(entry as any); if (missing.length > 0) { console.warn( `[db] Skipping alias='${(entry as any).alias ?? "?"}' (dialect='${(entry as any).dialect ?? "?"}'): ` + `missing env/fields: ${missing.join(", ")}` ); continue; } // ---- Compute displayable database name for this alias (per dialect) ---- let databaseName = ""; switch (entry.dialect) { case "mysql": case "pg": case "mssql": databaseName = (entry as any).database ?? ""; break; case "oracle": { const cs = (entry as any).connectString ?? ""; // Use everything after the final "/" as the service name, else the raw connect string databaseName = cs.includes("/") ? cs.split("/").pop()! : cs; break; } case "sqlite": { const f = (entry as any).file ?? (entry as any).path ?? ""; databaseName = f ? basename(f) : "(sqlite)"; break; } default: databaseName = (entry as any).database ?? ""; } if (!databaseName) databaseName = "(unknown)"; // Store meta for this alias meta.set(entry.alias, { alias: entry.alias, dialect: entry.dialect, databaseName, host: (entry as any).host, port: (entry as any).port, connectString: (entry as any).connectString, file: (entry as any).file ?? (entry as any).path, }); // Build and store DB with isolated env per alias clearDbEnv(); const patch = envPatchFor(entry); const db = await withEnv(patch, async () => await getDb()); if (registry.has(entry.alias)) { console.error(`[db] Duplicate alias '${entry.alias}' – previous entry will be overwritten.`); } registry.set(entry.alias, db); } } if (registry.size === 0) { console.warn(`[db] No usable database entries after expansion/validation from ${path}.`); } async function closeAll() { for (const db of registry.values()) { await db.close?.(); } } return { registry, meta, closeAll }; } // Utility: split a comma-separated env value into a string[], trim blanks. function splitList(v: unknown): string[] { if (v === undefined || v === null) return []; return String(v) .split(",") .map(s => s.trim()) .filter(Boolean); } /** * Expand a single raw DbEntry that may contain comma-separated fields * (alias, host, port, user, password, database, connectString) * into N concrete entries by zipping/broadcasting those lists. */ function expandDbEntry<T extends Record<string, any>>(raw: T): T[] { const multiKeys = new Set([ "alias", "host", "port", "user", "password", "database", // <-- make sure "database" is included "connectString", ]); // Collect arrays per multiKey and determine N (max length) const arrays: Record<string, string[]> = {}; let maxLen = 1; for (const [k, v] of Object.entries(raw)) { if (!multiKeys.has(k)) continue; const arr = Array.isArray(v) ? v.map(String) : splitList(v); if (arr.length) { arrays[k] = arr; if (arr.length > maxLen) maxLen = arr.length; } } const out: T[] = []; for (let i = 0; i < maxLen; i++) { const variant: any = { ...raw }; for (const key of Object.keys(arrays)) { const list = arrays[key]; // broadcast last item if list is shorter than maxLen variant[key] = list[Math.min(i, list.length - 1)]; } // Ensure alias uniqueness if (maxLen > 1) { const providedAlias = variant.alias ?? raw.dialect ?? "db"; const aliasWasList = (arrays.alias?.length ?? 0) > 1; variant.alias = aliasWasList ? providedAlias : `${providedAlias}${i === 0 ? "" : `_${i + 1}`}`; } // Normalize numeric port if present if (variant.port !== undefined) { const n = Number(variant.port); if (!Number.isNaN(n)) variant.port = n; } out.push(variant); } return out; } ``` src/db/providers/mssql.ts: ``` // src/db/providers/mssql.ts import mssql from 'mssql'; import type { DB } from '../provider.js'; export default function createMssqlDb(): DB { const connectionString = process.env.DATABASE_URL!; let pool: mssql.ConnectionPool | null = null; let connectPromise: Promise<mssql.ConnectionPool> | null = null; async function getPool(): Promise<mssql.ConnectionPool> { if (pool && pool.connected) return pool; if (!connectPromise) { connectPromise = new mssql.ConnectionPool(connectionString) .connect() .then(p => { pool = p; return p; }) .catch(err => { connectPromise = null; throw err; }); } return connectPromise; } return { dialect: 'mssql', async query(text, params?: any) { const p = await getPool(); const req = p.request(); // Support both array and object parameters if (params) { if (Array.isArray(params)) { // Accept either positional values OR {name, value} objects let posIndex = 0; for (const v of params) { if (v && typeof v === 'object' && 'name' in v) { req.input(String((v as any).name), (v as any).value as any); } else { req.input(`p${++posIndex}`, v as any); } } } else if (typeof params === 'object') { for (const [k, v] of Object.entries(params)) { req.input(k, v as any); } } } const result = await req.query(text); const rows = result.recordset ?? []; return { rows, rowCount: Array.isArray(rows) ? rows.length : 0 }; }, async close() { try { await pool?.close(); } finally { pool = null; connectPromise = null; } }, }; } ``` src/db/providers/mysql.ts: ``` import mysql from 'mysql2/promise'; import type { DB } from '../provider.js'; export default function createMysqlDb(): DB { const url = process.env.DATABASE_URL!; const pool = mysql.createPool(url); return { dialect: 'mysql', async query(sql, params) { const [rows] = await pool.query(sql, params); return { rows: rows as any[], rowCount: Array.isArray(rows) ? rows.length : 0 }; }, async close() { await pool.end(); } }; } ``` src/db/providers/oracle.ts: ``` // src/db/providers/oracle.ts import oracledb from 'oracledb'; import type { DB } from '../provider.js'; function parseEzConnect(url: string) { // DATABASE_URL format expected: user/password@host:port/service const m = url.match(/^([^/]+)\/([^@]+)@(.+)$/); if (!m) return null; const [, user, password, connectString] = m; return { user, password, connectString }; } function normalizeSql(sql: string): string { // Make "SELECT 1" portable in Oracle return /^\s*select\s+1\s*;?\s*$/i.test(sql) ? 'SELECT 1 AS "OK" FROM DUAL' : sql; } export default function createOracleDb(): DB { // Prefer ORACLE_* if provided; else parse DATABASE_URL (EZCONNECT) const url = process.env.DATABASE_URL!; const fromUrl = parseEzConnect(url) ?? {}; const user = process.env.ORACLE_USER ?? (fromUrl as any).user; const password = process.env.ORACLE_PASSWORD ?? (fromUrl as any).password; const connectString = process.env.ORACLE_CONNECT_STRING ?? (fromUrl as any).connectString; if (!user || !password || !connectString) { throw new Error('Oracle config missing: user/password/connectString'); } let pool: oracledb.Pool | null = null; let poolPromise: Promise<oracledb.Pool> | null = null; async function getPool(): Promise<oracledb.Pool> { if (pool) return pool; if (!poolPromise) { poolPromise = oracledb .createPool({ user, password, connectString, // You can expose pool tuning here if needed (poolMin, poolMax, stmtCacheSize, etc.) }) .then(p => { pool = p; return p; }) .catch(err => { poolPromise = null; throw err; }); } return poolPromise; } return { dialect: 'oracle', async query(text, params?: any) { const p = await getPool(); const conn = await p.getConnection(); try { const sql = normalizeSql(text); const bind = params ?? {}; const res = await conn.execute(sql, bind, { outFormat: oracledb.OUT_FORMAT_OBJECT }); const rows = (res.rows as any[]) ?? []; return { rows, rowCount: rows.length }; } finally { await conn.close(); } }, async close() { try { await pool?.close(0); } finally { pool = null; poolPromise = null; } }, }; } ``` src/db/providers/postgres.ts ``` // src/db/providers/postgres.ts import { Pool } from 'pg'; import type { DB } from '../provider.js'; export default function createPostgresDb(): DB { const pool = new Pool({ connectionString: process.env.DATABASE_URL! }); return { dialect: 'pg', async query(text, params?: any) { // Your param mapper should already convert :name → $1,$2 and give an array const res = await pool.query(text, Array.isArray(params) ? params : undefined); return { rows: res.rows, rowCount: res.rowCount ?? res.rows.length }; }, async close() { await pool.end(); }, }; } ``` src/server/http.ts: ``` import "dotenv/config"; import express from "express"; import type { Request, Response } from "express"; import { loadDbRegistryFromYaml } from "../db/registry.js"; import type { DB } from "../db/provider.js"; import type { DbAliasMeta } from "../db/registry.js"; import { mapNamedToDriver } from "../db/paramMap.js"; import { randomUUID } from "node:crypto"; import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js"; import { registerSqlTools } from "../tools/sql/index.js"; // NEW: RBAC policy import { evaluatePolicyFromFile } from "../policy/index.js"; import { evaluateToolsPolicyFromFile } from "../policy/index.js"; const app = express(); app.use(express.json()); const PORT = Number(process.env.PORT ?? 8787); // ——— DB registry state ——— type Row = Record<string, any>; let registry: Map<string, DB> = new Map(); let meta: Map<string, DbAliasMeta> = new Map(); let closeAll: () => Promise<void> = async () => {}; // ——— Helper: log ——— function logReq(method: string, req: Request) { const sid = req.header?.("mcp-session-id") ?? "(none)"; const bodyMethod = (req as any).body?.method ?? "(n/a)"; console.log(`[MCP] ${method} sid=${sid} bodyMethod=${bodyMethod}`); } // ——— Session with RBAC ——— type Session = { server: McpServer; transport: StreamableHTTPServerTransport; createdAt: number; lastSeenAt: number; user: { id?: string; roles: string[] }; allowedAliases: string[]; }; const sessions = new Map<string, Session>(); const SESSION_TTL_MS = Number(process.env.MCP_SESSION_TTL_MS ?? 30 * 60 * 1000); const EVICT_EVERY_MS = 60 * 1000; function rolesFromReq(req: Request): string[] { const raw = req.header("x-role") ?? ""; const roles = raw.split(",").map((s) => s.trim()).filter(Boolean); return roles.length ? roles : ["admin"]; } function requireSession(req: Request, res: Response): { sid: string; s?: Session } | null { const sid = req.header("mcp-session-id") ?? ""; if (!sid) { res.status(400).send("Invalid or missing mcp-session-id"); return null; } return { sid, s: sessions.get(sid) }; } function touch(sid: string) { const s = sessions.get(sid); if (s) s.lastSeenAt = Date.now(); } // Create a session restricted to allowed aliases async function createSession(req: Request): Promise<StreamableHTTPServerTransport> { const server = new McpServer({ name: "mcp-sql", version: "0.2.0" }); // Which aliases this user can access const roles = rolesFromReq(req); const allAliases = Array.from(registry.keys()); const policyPath = process.env.POLICY_FILE ?? "./policies.yaml"; const { allowedAliases } = evaluatePolicyFromFile(policyPath, { roles, allAliases }); // Per-alias tool + data policy const policies = evaluateToolsPolicyFromFile(policyPath, { roles, aliases: allowedAliases }); // Discovery tools: admin-only when X-Role is present; open when no role header const hasRoleHeader = !!req.header("x-role"); const isAdmin = roles.includes("admin"); // const discoveryVisible = hasRoleHeader ? isAdmin : true; // Always expose discovery tools; their results are already filtered to the session’s allowed aliases. const discoveryVisible = true; // User identity (for :user_id in rowFilters) const userId = req.header("x-user-id") ?? undefined; // Register aliases with the policy and user context for (const alias of allowedAliases) { const db = registry.get(alias)!; const p = policies[alias]; // may be undefined const applyDataPolicy = hasRoleHeader && !isAdmin && !!p; registerSqlTools(server, { db, auditPath: process.env.SQL_AUDIT_LOG, ns: alias, meta, registry, tools: p ? p.tools : undefined, dataPolicy: applyDataPolicy ? { readOnly: p!.readOnly, tableAllow: p!.tableAllow, rowFilters: p!.rowFilters } : undefined, userContext: applyDataPolicy ? { user_id: userId } : undefined, discoveryVisible, }); } const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID(), onsessioninitialized: (sid: string) => { sessions.set(sid, { server, transport, createdAt: Date.now(), lastSeenAt: Date.now(), user: { roles }, allowedAliases, }); console.log(`[MCP] session initialized: ${sid}, roles=${roles.join(",")}, aliases=${allowedAliases.join("|")}`); }, }); await server.connect(transport); return transport; } // ——— REST endpoints ——— app.get("/health", (_req, res) => res.status(200).send("ok")); app.get("/dbs", (_req, res) => { const names = Array.from( new Set(Array.from(meta.values()).map((m) => m.databaseName).filter(Boolean)) ).sort((a, b) => a.localeCompare(b)); res.json(names); }); app.get("/dbs/types", (_req, res) => { const types = Array.from(new Set(Array.from(meta.values()).map((m) => m.dialect))).sort(); res.json(types); }); app.get("/dbs/aliases", (_req, res) => { res.json(Array.from(registry.keys()).sort()); }); app.get("/dbs/list-by-type", (_req, res) => { const grouped: Record<string, string[]> = {}; for (const info of meta.values()) { (grouped[info.dialect] ??= []).push(info.databaseName); } for (const t of Object.keys(grouped)) { grouped[t] = Array.from(new Set(grouped[t])).sort((a, b) => a.localeCompare(b)); } res.json(grouped); }); app.post("/sql/query", async (req, res) => { try { const { db: nameOrAlias, type, sql, params = {}, readOnly = true, rowLimit = 1000, } = req.body ?? {}; if (typeof nameOrAlias !== "string" || !nameOrAlias.trim()) { return res.status(400).json({ error: "Body 'db' is required (alias or database name)." }); } if (typeof sql !== "string" || !sql.trim()) { return res.status(400).json({ error: "Body 'sql' is required." }); } let allowedAliases: string[] = Array.from(registry.keys()); // default (dev) const sid = req.header("mcp-session-id"); if (sid && sessions.has(sid)) { allowedAliases = sessions.get(sid)!.allowedAliases; } else if ((process.env.DEV_ALLOW_HEADER_ROLE ?? "1") === "1") { const roles = rolesFromReq(req); const policyPath = process.env.POLICY_FILE ?? "./policies.yaml"; allowedAliases = evaluatePolicyFromFile(policyPath, { roles, allAliases: Array.from(registry.keys()), }).allowedAliases; } // Resolve alias let alias = nameOrAlias; let db = registry.get(alias); if (!db) { const dialect = typeof type === "string" && type ? String(type).trim() : undefined; const matches = Array.from(meta.entries()) .filter(([_, m]) => m.databaseName === nameOrAlias && (!dialect || m.dialect === dialect)); if (matches.length === 0) { return res.status(404).json({ error: `Unknown db alias or database name: '${nameOrAlias}'${dialect ? ` (type=${dialect})` : ""}`, }); } if (matches.length > 1) { const hint = matches.map(([a, m]) => `${a} (${m.dialect})`).join(", "); return res.status(400).json({ error: `Ambiguous database name '${nameOrAlias}'. Provide 'type' (mysql\npg\nmssql\noracle\nsqlite) or use alias. Candidates: ${hint}`, }); } [alias] = matches[0]; db = registry.get(alias)!; } // Enforce RBAC if (!allowedAliases.includes(alias)) { return res.status(403).json({ error: `Forbidden: alias '${alias}' is not allowed for this user/session.` }); } if (readOnly && !/^\s*select\b/i.test(sql)) { return res.status(400).json({ error: "readOnly mode: only SELECT is allowed." }); } const { text, params: mapped } = mapNamedToDriver(sql, params, db.dialect); const t0 = Date.now(); const { rows, rowCount } = await db.query<Row>(text, mapped); const ms = Date.now() - t0; const limited: Row[] = Array.isArray(rows) ? rows.length > rowLimit ? rows.slice(0, rowLimit) : rows : []; res.setHeader("X-DB-Dialect", db.dialect); res.setHeader("X-Row-Count", String(rowCount ?? limited.length ?? 0)); res.setHeader("X-Elapsed-ms", String(ms)); return res.json(limited); } catch (err: any) { console.error(err); res.status(500).json({ error: String(err?.message ?? err) }); } }); // ——— MCP per-session transport ——— app.post("/mcp", async (req, res) => { logReq("POST", req); const hasSid = !!req.header("mcp-session-id"); if (!hasSid && isInitializeRequest((req as any).body)) { const transport = await createSession(req); return transport.handleRequest(req as any, res as any, (req as any).body); } if (hasSid) { const sid = req.header("mcp-session-id")!; const sess = sessions.get(sid); if (!sess) { return res.status(400).json({ jsonrpc: "2.0", error: { code: -32000, message: "Bad Request: Invalid or expired mcp-session-id" }, id: null, }); } touch(sid); return sess.transport.handleRequest(req as any, res as any, (req as any).body); } return res.status(400).json({ jsonrpc: "2.0", error: { code: -32000, message: "Bad Request: No valid session or initialize request" }, id: null, }); }); app.get("/mcp", (req, res) => { logReq("GET", req); const r = requireSession(req, res); if (!r) return; const { sid, s } = r; if (!s) return; res.setHeader("Cache-Control", "no-cache"); res.setHeader("Connection", "keep-alive"); touch(sid); return s.transport.handleRequest(req as any, res as any); }); app.delete("/mcp", async (req, res) => { logReq("DELETE", req); const r = requireSession(req, res); if (!r) return; const { sid, s } = r; if (!s) return; await s.transport.handleRequest(req as any, res as any); sessions.delete(sid); console.log(`[MCP] session deleted: ${sid}`); }); setInterval(() => { if (SESSION_TTL_MS <= 0) return; const now = Date.now(); for (const [sid, s] of sessions) { if (now - s.lastSeenAt > SESSION_TTL_MS) { sessions.delete(sid); console.log(`[MCP] session evicted (idle): ${sid}`); } } }, EVICT_EVERY_MS); // ——— Boot ——— (async () => { const cfgPath = process.env.SQL_DBS_CONFIG ?? "./dbs.yaml"; const loaded = await loadDbRegistryFromYaml(cfgPath); registry = loaded.registry; closeAll = loaded.closeAll; meta = loaded.meta; app.listen(PORT, () => { console.log(`HTTP bridge listening on http://localhost:${PORT}`); const types = Array.from(new Set(Array.from(meta.values()).map((m) => m.dialect))).sort(); const names = Array.from(new Set(Array.from(meta.values()).map((m) => m.databaseName))).sort(); const aliases = Array.from(registry.keys()).sort(); console.log(`Available DB types: ${types.join(", ")}`); console.log(`Available DB names: ${names.join(", ")}`); console.log(`Available DB aliases: ${aliases.join(", ")}`); console.log(`[MCP] Per-session server+transport mode is ACTIVE`); }); })(); process.on("SIGINT", async () => { await closeAll?.(); process.exit(0); }); process.on("SIGTERM", async () => { await closeAll?.(); process.exit(0); }); ``` src/tools/sql/index.ts ``` import { z } from "zod"; import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import type { DB } from "../../db/provider.js"; import type { DbAliasMeta } from "../../db/registry.js"; import { mapNamedToDriver } from "../../db/paramMap.js"; import { sqlGuardrails } from "./templates.js"; import { excludedOracleTables } from "./unwantedOracle.js"; /* ──────────────────────────────────────────────────────────────────────────── Arg normalization + registration helper (avoid SDK pre-validation) ──────────────────────────────────────────────────────────────────────────── */ function normalizeArgsRaw(argsRaw: unknown): any { if (typeof argsRaw === "string") { try { return JSON.parse(argsRaw); } catch { return {}; } } return argsRaw && typeof argsRaw === "object" ? argsRaw : {}; } // Always provide raw Zod shape to SDK, and compile locally for parsing. function registerToolNoSchema< TShape extends z.ZodRawShape | null | undefined >( server: McpServer, name: string, meta: { title?: string; description?: string }, shape: TShape, handler: (args: any) => Promise<any> ) { // Raw shape for SDK (publishes JSON Schema) const rawShape: z.ZodRawShape = (shape ?? {}) as z.ZodRawShape; // Compiled object for local parsing const compiled = z.object(rawShape); server.registerTool( name, { title: meta.title, description: meta.description, // ⬅️ IMPORTANT: pass RAW SHAPE, not a ZodObject inputSchema: rawShape, }, async (argsRaw) => { const raw = normalizeArgsRaw(argsRaw); const parsed = compiled.parse(raw); return handler(parsed); } ); } /* ──────────────────────────────────────────────────────────────────────────── Per-server state ──────────────────────────────────────────────────────────────────────────── */ const serverAliases = new WeakMap<McpServer, Set<string>>(); const discoveryRegistered = new WeakSet<McpServer>(); export function registerSqlTools( server: McpServer, { db, auditPath, ns, meta, registry, tools, dataPolicy, userContext, discoveryVisible, }: { db: DB; auditPath?: string; ns?: string; meta: Map<string, DbAliasMeta>; registry: Map<string, DB>; tools?: { schema?: boolean; peek?: boolean; query?: boolean }; dataPolicy?: { readOnly?: boolean; tableAllow?: string[]; rowFilters?: Record<string, string>; }; userContext?: { user_id?: string }; discoveryVisible?: boolean; } ) { const name = (base: string) => (ns ? `${ns}.${base}` : base); // Track aliases served in this session if (ns) { const set = serverAliases.get(server) ?? new Set<string>(); set.add(ns); serverAliases.set(server, set); } /* ──────────────────────────────────────────────────────────────────────── Discovery tools (registered once per server) ──────────────────────────────────────────────────────────────────────── */ if (!discoveryRegistered.has(server)) { discoveryRegistered.add(server); if (discoveryVisible !== false) { const metaVisible = (): DbAliasMeta[] => { const allowed = serverAliases.get(server) ?? new Set<string>(); const out: DbAliasMeta[] = []; for (const [alias, m] of meta.entries()) if (allowed.has(alias)) out.push({ ...m }); return out; }; // db.aliases (no args) -> JSON only registerToolNoSchema( server, "db.aliases", { title: "List database aliases", description: "Return the list of available database aliases visible to this session.", }, null, async () => { const set = serverAliases.get(server) ?? new Set<string>(); const aliases = Array.from(set).sort(); // return { content: [{ type: "json", json: aliases }] }; return { content: [{ type: "text", text: JSON.stringify(aliases) }] }; } ); // db.types (no args) -> JSON only registerToolNoSchema( server, "db.types", { title: "List available database (types)", description: "List available database dialects (types) visible in this session.", }, null, async () => { const visible = metaVisible(); const types = Array.from(new Set(visible.map((m) => m.dialect))).sort(); // return { content: [{ type: "json", json: types }] }; return { content: [{ type: "text", text: JSON.stringify(types) }] }; } ); // db.names (no args) -> JSON only registerToolNoSchema( server, "db.names", { title: "List database names", description: "List database names (not aliases) visible in this session (unique, sorted).", }, null, async () => { const visible = metaVisible(); const names = Array.from( new Set(visible.map((m) => m.databaseName).filter(Boolean)) ).sort((a, b) => a.localeCompare(b)); return { content: [{ type: "text", text: JSON.stringify(names) }] }; // return { content: [{ type: "json", json: names }] }; } ); // db.listByType (args) -> JSON only const LIST_BY_TYPE = { type: z.string().min(1).describe("Dialect: mysql\npg\nmssql\noracle\nsqlite"), unique: z.boolean().optional().default(true), includeAliases: z.boolean().optional().default(false), } satisfies z.ZodRawShape; registerToolNoSchema( server, "db.listByType", { title: "List databases by type", description: "List database names for a given dialect. unique=true returns unique names; set unique=false for one row per alias; includeAliases=true to add alias.", }, LIST_BY_TYPE, async ({ type, unique, includeAliases }) => { const dialect = String(type ?? "").trim(); if (!dialect) { const err = { error: "Missing required 'type'." }; return { isError: true, content: [{ type: "json", json: err }] }; } const allowed = serverAliases.get(server) ?? new Set<string>(); const visible = [...meta.entries()] .filter(([alias]) => allowed.has(alias)) .map(([, m]) => m) .filter((m) => m.dialect === dialect); if (unique) { const names = Array.from( new Set(visible.map((i) => i.databaseName).filter(Boolean)) ).sort((a, b) => a.localeCompare(b)); return { content: [{ type: "json", json: names }] }; } const rows = visible .map((i) => (includeAliases ? { alias: i.alias, name: i.databaseName } : { name: i.databaseName })) .sort( (a: any, b: any) => String(a.name).localeCompare(String(b.name)) + (a.alias !== undefined && b.alias !== undefined ? String(a.alias).localeCompare(String(b.alias)) : 0) ); return { content: [{ type: "json", json: rows }] }; } ); } } async function audit(line: string) { if (!auditPath) return; const fs = await import("node:fs/promises"); await fs.appendFile(auditPath, line + "\n", "utf8"); } /* ──────────────────────────────────────────────────────────────────────── Namespaced SQL tools ──────────────────────────────────────────────────────────────────────── */ // sql.schema (no args) -> Markdown only if (tools?.schema !== false) { registerToolNoSchema( server, name("sql.schema"), { title: "Describe schema", description: "Return a compact Markdown outline of tables and columns for the chosen database.", }, null, async () => { const md = await describeSchema(db); return { content: [{ type: "text", text: md }] }; } ); } // sql.peek (args) -> single-type output if (tools?.peek !== false) { const PEEK_SHAPE = { maxRowsPerTable: z.number().int().min(1).max(10000).optional().default(50), as: z.enum(["markdown", "json"]).optional().default("markdown"), } satisfies z.ZodRawShape; registerToolNoSchema( server, name("sql.peek"), { title: "Peek into database content", description: [ "Return up to N rows from each base table in the chosen database.", "Dialect-aware and read-only. Use this to quickly inspect unknown schemas.", ].join("\n"), }, PEEK_SHAPE, async ({ maxRowsPerTable, as }) => { const tables = await listTables(db); const safeTables = Array.from( new Set(tables.filter((t): t is string => typeof t === "string" && t.length > 0)) ); if (!safeTables.length) { return as === "json" ? { content: [{ type: "json", json: [] }] } : { content: [{ type: "text", text: "_(no tables)_" }] }; } const dump = await dumpTables(db, safeTables, maxRowsPerTable!); if (as === "json") { return { content: [{ type: "json", json: dump }] }; } const md = dump .map(({ table, rows }) => `## ${table}\n\n${toMarkdown(rows)}`) .join("\n\n"); return { content: [{ type: "text", text: md }] }; } ); } // sql.query (args) -> single-type output if (tools?.query !== false) { const QUERY_SHAPE = { sql: z.string(), params: z.record(z.any()).optional().default({}), readOnly: z.boolean().optional().default(true), rowLimit: z.number().int().min(1).max(10000).optional().default(1000), as: z.enum(["json", "markdown"]).optional().default("json"), } satisfies z.ZodRawShape; registerToolNoSchema( server, name("sql.query"), { title: "Execute SQL", description: ["Execute a parameterized SQL query against the chosen database.", "", "**Usage Tips:**", sqlGuardrails()].join("\n"), }, QUERY_SHAPE, async ({ sql, params = {}, readOnly = true, rowLimit = 1000, as = "json" }) => { // 1) readOnly (policy overrides user input) const effectiveReadOnly = dataPolicy?.readOnly ?? readOnly; if (effectiveReadOnly && !/^\s*select\b/i.test(sql)) { throw new Error("readOnly mode: only SELECT is allowed."); } // NEW: Only block when a non-empty user_id is explicitly provided and differs const userIdArgPresent = params != null && Object.prototype.hasOwnProperty.call(params, "user_id") && params.user_id != null && String(params.user_id).trim() !== ""; if (dataPolicy?.rowFilters && userIdArgPresent) { const arg = String(params.user_id).trim(); const sessionUid = String(userContext?.user_id ?? "").trim(); if (arg !== sessionUid) { throw new Error("I'm sorry, you don't have permission to access this data."); } } // 2) table allowlist + 3) row filters let effectiveSql = sql; let effectiveParams: Record<string, any> = { ...(params ?? {}) }; if ((dataPolicy?.tableAllow?.length || dataPolicy?.rowFilters)) { const base = detectBaseTable(sql); if (base) { // const bare = base.replace(/^["'`\[\]]?/g, "").split(".").pop()!.toLowerCase(); const lastPart = base.split(".").pop()!; const bare = lastPart.replace(/^[\[\]"'`]+|[\[\]"'`]+$/g, "").toLowerCase(); // table allowlist if (dataPolicy?.tableAllow?.length) { const ok = dataPolicy.tableAllow.map((t) => t.toLowerCase()).includes(bare); // if (!ok) throw new Error(`Forbidden: table '${bare}' not allowed for this role.`); if (!ok) throw new Error("I'm sorry, you don't have permission to access this table."); } // row filters const filter = dataPolicy?.rowFilters?.[bare]; if (filter) { if (/:user_id\b/.test(filter) && !userContext?.user_id) { throw new Error("Missing user identity (user_id) for row-level policy."); } effectiveSql = addWhere(effectiveSql, filter); if (userContext?.user_id !== undefined) { effectiveParams = { ...effectiveParams, user_id: userContext.user_id }; } } } } // 4) execute const { text, params: mapped } = mapNamedToDriver(effectiveSql, effectiveParams, db.dialect); const t0 = Date.now(); const { rows, rowCount } = await db.query(text, mapped); const ms = Date.now() - t0; const limited = Array.isArray(rows) && rows.length > rowLimit ? rows.slice(0, rowLimit) : rows; await audit(`[${new Date().toISOString()}] ${db.dialect} rows=${rowCount ?? limited?.length ?? 0} ms=${ms} sql=${effectiveSql}`); if (as === "markdown") { return { content: [{ type: "text", text: toMarkdown(limited) }] }; } return { content: [{ type: "json", json: limited }] }; } ); } } /* ──────────────────────────────────────────────────────────────────────────── Helper functions (unchanged except markdown table layout + :user_id fix) ──────────────────────────────────────────────────────────────────────────── */ function toMarkdown(rows: any[]) { if (!rows?.length) return "_(no rows)_"; const headers = Object.keys(rows[0]); const top = `${headers.join(" | ")}\n`; const sep = `${headers.map(() => "---").join(" | ")}\n`; const body = rows.map((r) => `${headers.map((h) => fmt(r[h])).join(" | ")}`).join("\n"); return [top, sep, body].join(""); } function fmt(v: unknown) { if (v === null || v === undefined) return ""; if (typeof v === "object") return "```json\n" + JSON.stringify(v) + "\n```"; return String(v); } function quoteIdent(dialect: DB["dialect"], ident: string) { switch (dialect) { case "pg": case "oracle": case "sqlite": { const safe = ident.replace(/"/g, '""'); return `"${safe}"`; } case "mysql": { const safe = ident.replace(/`/g, "``"); return `\`${safe}\``; } case "mssql": { const safe = ident.replace(/]/g, "]]"); return `[${safe}]`; } } } function quoteMaybeQualified(dialect: DB["dialect"], ident: string) { if (ident.includes(".")) { const [schema, name] = ident.split("."); return `${quoteIdent(dialect, schema)}.${quoteIdent(dialect, name)}`; } return quoteIdent(dialect, ident); } async function listTables(dbX: DB): Promise<string[]> { switch (dbX.dialect) { case "pg": { const sql = ` SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'BASE TABLE' ORDER BY table_name`; const { rows } = await dbX.query<{ table_name: string }>(sql, []); return rows.map((r) => r.table_name); } case "mysql": { const sql = ` SELECT TABLE_NAME AS table_name FROM information_schema.tables WHERE table_schema = DATABASE() AND TABLE_TYPE = 'BASE TABLE' ORDER BY TABLE_NAME`; const { rows } = await dbX.query<{ table_name: string }>(sql, []); return rows.map((r) => r.table_name); } case "mssql": { const sql = ` SELECT TABLE_SCHEMA AS table_schema, TABLE_NAME AS table_name FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' ORDER BY TABLE_SCHEMA, TABLE_NAME`; const { rows } = await dbX.query<{ table_schema: string; table_name: string }>(sql, []); return rows.map((r) => r.table_name); } case "oracle": { const quoted = excludedOracleTables.map((name) => `'${name.toUpperCase()}'`).join(", "); const sql = ` SELECT table_name AS "table_name" FROM user_tables WHERE temporary = 'N' AND table_name NOT LIKE 'ROLLING$%' AND table_name NOT LIKE 'SCHEDULER_%' ${excludedOracleTables.length ? `AND table_name NOT IN (${quoted})` : ""} AND table_name NOT IN (SELECT object_name FROM user_recyclebin) ORDER BY table_name`; const { rows } = await dbX.query<{ table_name: string }>(sql, []); return rows.map((r) => r.table_name); } case "sqlite": { const sql = ` SELECT name AS table_name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name`; const { rows } = await dbX.query<{ table_name: string }>(sql, []); return rows.map((r) => r.table_name); } } } async function dumpTables(dbX: DB, tables: string[], maxRows: number) { const result: { table: string; rows: any[] }[] = []; for (const t of tables) { const qTable = quoteMaybeQualified(dbX.dialect, t); let sql: string; let params: any; switch (dbX.dialect) { case "pg": { sql = `SELECT * FROM ${qTable} LIMIT $1`; params = [maxRows]; break; } case "mysql": case "sqlite": { sql = `SELECT * FROM ${qTable} LIMIT ?`; params = [maxRows]; break; } case "mssql": { sql = `SELECT TOP (${maxRows}) * FROM ${qTable}`; params = []; break; } case "oracle": { sql = `SELECT * FROM ${qTable} WHERE ROWNUM <= :n`; params = { n: maxRows }; break; } } const { rows } = await dbX.query<any>(sql, params); result.push({ table: t, rows: Array.isArray(rows) ? rows.slice(0, maxRows) : [] }); } return result; } async function describeViaQuery<T extends Record<string, any>>( dbX: DB, sql: string, tableKey: string, columnKey: string, typeKey: string ): Promise<string> { const { rows } = await dbX.query<T>(sql, []); const m = new Map<string, string[]>(); for (const r of rows) { const t = (r as any)[tableKey]; const c = (r as any)[columnKey]; const d = (r as any)[typeKey]; if (!t || !c) continue; const list = m.get(t) ?? []; list.push(`${c} ${d ?? ""}`.trim()); m.set(t, list); } return ( [...m.entries()] .map(([t, cols]) => `### ${t}\n- ${cols.join("\n- ")}`) .join("\n\n") || "_(no tables)_" ); } async function describeSchema(dbX: DB) { const tables = await listTables(dbX); const safeTables = Array.from(new Set(tables.filter((t): t is string => typeof t === "string" && t.length > 0))); if (!safeTables.length) return "_(no tables)_"; switch (dbX.dialect) { case "pg": { const inList = safeTables.map((t) => `'${t}'`).join(", "); const sql = ` SELECT table_name, column_name, data_type FROM information_schema.columns WHERE table_schema = 'public' AND table_name IN (${inList}) ORDER BY table_name, ordinal_position`; return await describeViaQuery<Record<string, any>>(dbX, sql, "table_name", "column_name", "data_type"); } case "mysql": { const inList = safeTables.map((t) => `'${t}'`).join(", "); const sql = ` SELECT TABLE_NAME AS table_name, COLUMN_NAME AS column_name, DATA_TYPE AS data_type FROM information_schema.columns WHERE table_schema = DATABASE() AND TABLE_NAME IN (${inList}) ORDER BY TABLE_NAME, ORDINAL_POSITION`; return await describeViaQuery<Record<string, any>>(dbX, sql, "table_name", "column_name", "data_type"); } case "mssql": { const q = safeTables.map((t) => { if (t.includes(".")) { const [schema, name] = t.split("."); return { schema: schema.replace(/'/g, "''"), name: name.replace(/'/g, "''") }; } return { schema: null as string | null, name: t.replace(/'/g, "''") }; }); const hasSchema = q.some((x) => !!x.schema); let sql: string; if (hasSchema) { const orConds = q .map((x) => x.schema ? `(TABLE_SCHEMA = '${x.schema}' AND TABLE_NAME = '${x.name}')` : `(TABLE_NAME = '${x.name}')` ) .join(" OR "); sql = ` SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) AS table_name, COLUMN_NAME AS column_name, DATA_TYPE AS data_type FROM INFORMATION_SCHEMA.COLUMNS WHERE ${orConds} ORDER BY TABLE_SCHEMA, TABLE_NAME, ORDINAL_POSITION`; } else { const inList = q.map((x) => `'${x.name}'`).join(", "); sql = ` SELECT TABLE_NAME AS table_name, COLUMN_NAME AS column_name, DATA_TYPE AS data_type FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME IN (${inList}) ORDER BY TABLE_NAME, ORDINAL_POSITION`; } return await describeViaQuery<Record<string, any>>(dbX, sql, "table_name", "column_name", "data_type"); } case "oracle": { const inList = safeTables.map((t) => `'${t.toUpperCase()}'`).join(", "); const sql = ` SELECT table_name AS "table_name", column_name AS "column_name", CASE WHEN data_type IN ('VARCHAR2','NVARCHAR2','CHAR','NCHAR') AND data_length IS NOT NULL THEN data_type || '(' || data_length || ')' WHEN data_type = 'NUMBER' AND data_precision IS NOT NULL THEN data_type || '(' || data_precision || NVL2(data_scale, ',' || data_scale, '') || ')' ELSE data_type END AS "data_type" FROM user_tab_columns WHERE UPPER(table_name) IN (${inList}) ORDER BY table_name, column_id`; return await describeViaQuery<Record<string, any>>(dbX, sql, "table_name", "column_name", "data_type"); } case "sqlite": { const parts: string[] = []; for (const t of safeTables) { const pragma = `PRAGMA table_info(${quoteIdent(dbX.dialect, t)});`; const { rows } = await dbX.query<{ name: string; type: string }>(pragma, []); if (!rows?.length) continue; const body = rows.map((r) => `- ${r.name} \`${r.type}\``).join("\n"); parts.push(`## ${t}\n\n${body}`); } return parts.join("\n\n") || "_(no tables)_"; } } } function detectBaseTable(sql: string): string | null { const m = sql.replace(/\s+/g, " ").match(/\bfrom\s+([A-Za-z0-9_."`\[\]]+)/i); return m?.[1] ?? null; } function addWhere(sql: string, filter: string): string { const idxOrder = sql.search(/\border\s+by\b/i); const idxLimit = sql.search(/\blimit\b/i); const idxOffset = sql.search(/\boffset\b/i); const idxFetch = sql.search(/\bfetch\b/i); const cut = [idxOrder, idxLimit, idxOffset, idxFetch].filter((i) => i >= 0).sort((a, b) => a - b)[0] ?? sql.length; const head = sql.slice(0, cut); const tail = sql.slice(cut); if (/\bwhere\b/i.test(head)) return head + " AND (" + filter + ") " + tail; return head + " WHERE " + filter + " " + tail; } ``` src/tools/sql/templates.ts ``` export function sqlGuardrails(): string { return [ "1. Use a single SELECT statement.", "2. Always use :name placeholders (e.g., :from, :limit).", "3. Avoid INSERT, UPDATE, DELETE unless explicitly allowed.", "4. Use exact table/column names (call `sql.schema` first if unsure).", "5. Add LIMIT/TOP/ROWNUM to keep results small.", "6. Prefer ANSI SQL over vendor-specific syntax.", ].join("\n"); } ``` src/tools/sql/unwantedOracle.ts ``` // The creation of Oracle Database will create other tables that are not // directly related to the user tables. This script will drop those unwanted // tables. export const excludedOracleTables = [ "AQ$_INTERNET_AGENTS", "AQ$_INTERNET_AGENT_PRIVS", "AQ$_KEY_SHARD_MAP", "AQ$_QUEUES", "AQ$_QUEUE_TABLES", "AQ$_SCHEDULES", "HELP", "LOGMNRC_CONCOL_GG", "LOGMNRC_CON_GG", "LOGMNRC_DBNAME_UID_MAP", "LOGMNRC_GSBA", "ORDERS", "MVIEW_WORKLOAD", "MVIEW_RECOMMENDATIONS", "MVIEW_LOG", "MVIEW_FILTERINSTANCE", "MVIEW_FILTER", "MVIEW_EXCEPTIONS", "MVIEW_EVALUATIONS", "LOGMNR_SHARD_TS", "LOGMNRC_GSII", "LOGMNRC_GTCS", "LOGMNRC_GTLO", "LOGMNRC_INDCOL_GG", "LOGMNRC_IND_GG", "LOGMNRC_SEQ_GG", "LOGMNRC_SHARD_TS", "LOGMNRC_TS", "LOGMNRC_TSPART", "LOGMNRC_USER", "LOGMNRGGC_GTCS", "LOGMNRGGC_GTLO", "LOGMNRP_CTAS_PART_MAP", "LOGMNRT_MDDL$", "LOGMNR_AGE_SPILL$", "LOGMNR_ATTRCOL$", "LOGMNR_ATTRIBUTE$", "LOGMNR_CCOL$", "LOGMNR_CDEF$", "LOGMNR_COL$", "LOGMNR_COLTYPE$", "LOGMNR_CON$", "LOGMNR_CONTAINER$", "LOGMNR_DICTIONARY$", "LOGMNR_DICTSTATE$", "LOGMNR_DID$", "LOGMNR_ENC$", "LOGMNR_ERROR$", "LOGMNR_FILTER$", "LOGMNR_GLOBAL$", "LOGMNR_GT_TAB_INCLUDE$", "LOGMNR_GT_USER_INCLUDE$", "LOGMNR_GT_XID_INCLUDE$", "LOGMNR_ICOL$", "LOGMNR_IDNSEQ$", "LOGMNR_IND$", "LOGMNR_INDCOMPART$", "LOGMNR_INDPART$", "LOGMNR_INDSUBPART$", "LOGMNR_KOPM$", "LOGMNR_LOB$", "LOGMNR_LOBFRAG$", "LOGMNR_LOG$", "LOGMNR_LOGMNR_BUILDLOG", "LOGMNR_NTAB$", "LOGMNR_OBJ$", "LOGMNR_OPQTYPE$", "LOGMNR_PARAMETER$", "LOGMNR_PARTOBJ$", "LOGMNR_PDB_INFO$", "LOGMNR_PROCESSED_LOG$", "LOGMNR_PROFILE_PLSQL_STATS$", "LOGMNR_PROFILE_TABLE_STATS$", "LOGMNR_PROPS$", "LOGMNR_REFCON$", "LOGMNR_RESTART_CKPT$", "LOGMNR_RESTART_CKPT_TXINFO$", "LOGMNR_SEED$", "LOGMNR_SESSION$", "LOGMNR_SESSION_ACTIONS$", "LOGMNR_SESSION_EVOLVE$", "LOGMNR_SPILL$", "LOGMNR_SUBCOLTYPE$", "LOGMNR_TAB$", "LOGMNR_TABCOMPART$", "LOGMNR_TABPART$", "LOGMNR_TABSUBPART$", "LOGMNR_TS$", "LOGMNR_TYPE$", "LOGMNR_UID$", "LOGMNR_USER$", "LOGSTDBY$APPLY_MILESTONE", "LOGSTDBY$APPLY_PROGRESS", "LOGSTDBY$EDS_TABLES", "LOGSTDBY$EVENTS", "LOGSTDBY$FLASHBACK_SCN", "LOGSTDBY$HISTORY", "LOGSTDBY$PARAMETERS", "LOGSTDBY$PLSQL", "LOGSTDBY$SCN", "LOGSTDBY$SKIP", "LOGSTDBY$SKIP_SUPPORT", "LOGSTDBY$SKIP_TRANSACTION", "MVIEW$_ADV_AJG", "MVIEW$_ADV_BASETABLE", "MVIEW$_ADV_CLIQUE", "MVIEW$_ADV_ELIGIBLE", "MVIEW$_ADV_EXCEPTIONS", "MVIEW$_ADV_FILTER", "MVIEW$_ADV_FILTERINSTANCE", "MVIEW$_ADV_FJG", "MVIEW$_ADV_GC", "MVIEW$_ADV_INFO", "MVIEW$_ADV_JOURNAL", "MVIEW$_ADV_LEVEL", "MVIEW$_ADV_LOG", "MVIEW$_ADV_OUTPUT", "MVIEW$_ADV_PARAMETERS", "MVIEW$_ADV_PLAN", "MVIEW$_ADV_PRETTY", "MVIEW$_ADV_ROLLUP", "MVIEW$_ADV_SQLDEPEND", "MVIEW$_ADV_TEMP", "MVIEW$_ADV_WORKLOAD", "OL$", "OL$HINTS", "OL$NODES", "PRODUCT_PRIVS", "REDO_DB", "REDO_LOG", "REPL_SUPPORT_MATRIX", "REPL_VALID_COMPAT", "ROLLING$CONNECTIONS", "ROLLING$DATABASES", "ROLLING$DIRECTIVES", "ROLLING$EVENTS", "ROLLING$PARAMETERS", "ROLLING$PLAN", "ROLLING$STATISTICS", "ROLLING$STATUS", "SCHEDULER_JOB_ARGS_TBL", "SCHEDULER_PROGRAM_ARGS_TBL", "SQLPLUS_PRODUCT_PROFILE" ]; ``` src/policy/index.ts: ``` // src/policy/index.ts import fs from "node:fs"; import * as yaml from "js-yaml"; export type PolicyFile = { roleBindings?: Record<string, { allow?: { aliases?: string[] } }>; }; let cached: { mtimeMs: number; path: string; policy: PolicyFile } | null = null; function loadYaml(path: string): PolicyFile { const stat = fs.statSync(path); if (cached && cached.path === path && cached.mtimeMs === stat.mtimeMs) { return cached.policy; } const raw = fs.readFileSync(path, "utf8"); const obj = yaml.load(raw) as PolicyFile; cached = { mtimeMs: stat.mtimeMs, path, policy: obj }; return obj; } export type EvalInput = { roles: string[]; // e.g., ['customer'] or ['merchant_admin'] allAliases: string[]; // Array.from(registry.keys()) }; export type EvalOutput = { allowedAliases: string[]; }; export function evaluatePolicyFromFile(path: string, input: EvalInput): EvalOutput { const doc = loadYaml(path); const rb = doc.roleBindings ?? {}; const out = new Set<string>(); for (const role of input.roles) { const allow = rb[role]?.allow?.aliases ?? []; if (allow.includes("*")) { input.allAliases.forEach(a => out.add(a)); continue; } allow.forEach(a => out.add(a)); } // Only keep aliases that exist on this server const allowed = [...out].filter(a => input.allAliases.includes(a)).sort(); return { allowedAliases: allowed }; } // ----------------------------------------------------------------------------- // Optional: tool-level policy resolution (used by http.ts when present) // ----------------------------------------------------------------------------- // src/policy/index.ts export type ToolsAllowed = { schema: boolean; peek: boolean; query: boolean }; export type ToolsPolicyResult = { tools: ToolsAllowed; readOnly?: boolean; tableAllow?: string[]; rowFilters?: Record<string, string>; }; export function evaluateToolsPolicyFromFile( path: string, input: { roles: string[]; aliases: string[] } ): Record<string, ToolsPolicyResult> { const doc: any = loadYaml(path) || {}; const tp = doc.toolPolicies ?? {}; const out: Record<string, ToolsPolicyResult> = {}; for (const alias of input.aliases) { const spec = tp[alias]; if (!spec) continue; // Start from default (if present) const dList = Array.isArray(spec.default?.tools) ? (spec.default.tools as string[]) : undefined; let result: ToolsPolicyResult = { tools: dList ? { schema: dList.includes("sql.schema"), peek: dList.includes("sql.peek"), query: dList.includes("sql.query") } : { schema: true, peek: true, query: true }, readOnly: spec.default?.readOnly, tableAllow: spec.default?.tableAllow, rowFilters: spec.default?.rowFilters, }; // Apply byRole overrides (last matching role wins) const byRole = spec.byRole ?? {}; for (const r of input.roles) { const rs = byRole[r]; if (!rs) continue; if (Array.isArray(rs.tools)) { result.tools = { schema: rs.tools.includes("sql.schema"), peek: rs.tools.includes("sql.peek"), query: rs.tools.includes("sql.query"), }; } if (typeof rs.readOnly === "boolean") result.readOnly = rs.readOnly; if (Array.isArray(rs.tableAllow)) result.tableAllow = rs.tableAllow; if (rs.rowFilters && typeof rs.rowFilters === "object") result.rowFilters = rs.rowFilters; } out[alias] = result; } return out; } ``` policies.yaml (root folder): ``` # Which aliases each role may access # (Use comma-separated roles in dev header X-Role to simulate multiple) # Role Bindings - Define by roles and which database it can access roleBindings: # Admin can access everything admin: allow: aliases: ["*"] # Customer policies - Can access customer db only customer: allow: aliases: [customer_db, merchant_db] customer_admin: allow: aliases: [customer_db] # Merchant policies merchant: allow: aliases: [merchant_db] merchant_admin: allow: aliases: [merchant_db] # Tool Policies - Which tools a role can use per alias (optional) toolPolicies: customer_db: default: tools: [sql.schema, sql.peek, sql.query] byRole: customer: # tools: [sql.query] tools: [sql.schema, sql.query] # allow schema for readOnly: true tableAllow: ["users","purchase_history","points_history"] rowFilters: users: "user_id = :user_id" purchase_history: "user_id = :user_id" points_history: "user_id = :user_id" allow: aliases: [customer_db] merchant_db: default: tools: [sql.schema, sql.peek, sql.query] byRole: merchant: # tools: [sql.query] tools: [sql.schema, sql.query] # same idea readOnly: true tableAllow: ["merchants", "items", "purchase_history"] rowFilters: items: "merchant_id = :user_id" allow: aliases: [customer_db, merchant_db] customer: tools: [sql.schema, sql.query] readOnly: true tableAllow: ["items"] ``` Postman (All commented but I just uncomment each section when I wanna use): ``` // Initialize Handshake // { // "jsonrpc": "2.0", // "id": "2", // "method": "initialize", // "params": { // "protocolVersion": "2025-03-26", // "clientInfo": { "name": "postman", "version": "1.0.0" }, // "capabilities": { "roots": { "listChanged": true }, "sampling": {}, "tools": {} } // } // } // Tell the server the client is ready for normal operations - Take the mcp-session id under the response header in the Postman // { // "jsonrpc": "2.0", // "method": "notifications/initialized" // } // List Available tools // { // "jsonrpc": "2.0", // "id": "2", // "method": "tools/list", // "params": {} // } // ----------------------------------------------------------------------------------------------------------- // List of DB Types Available // { // "jsonrpc": "2.0", // "id": "3", // "method": "tools/call", // "params": { // "name": "db.types", // "arguments": { // } // } // } // List of DB Names List // { // "jsonrpc": "2.0", // "id": "3", // "method": "tools/call", // "params": { // "name": "db.names", // "arguments": {} // } // } // // List of DB Aliases list // { // "jsonrpc": "2.0", // "id": "10", // "method": "tools/call", // "params": { // "name": "db.aliases", // "arguments": {} // } // } // List of DB available based on type: // { // "jsonrpc": "2.0", // "id": "3", // "method": "tools/call", // "params": { // "name": "db.listByType", // "arguments": { "type": "mssql" } // } // } // ----------------------------------------------------------------------------------------------------------- // Call peek the database // { // "jsonrpc": "2.0", // "id": "7", // "method": "tools/call", // "params": { // "name": "mssql_2.sql.peek", // "arguments": { // "maxRowsPerTable": 5, // "as": "markdown" // } // } // } // Call sql tool - list of db existed // { // "jsonrpc": "2.0", // "id": "3", // "method": "tools/call", // "params": { // "name": "db.list", // "arguments": {} // } // } // Call sql tool - list of db existed // { // "jsonrpc": "2.0", // "id": "3", // "method": "tools/call", // "params": { // "name": "db.list", // "arguments": {} // } // } // Call sql tool - sql query, peek, schema // { // "jsonrpc": "2.0", // "id": "3", // "method": "tools/call", // "params": { // // postgres // // "name": "cinema_users.sql.query", // // "arguments": { // // "db": "cinema_users", // // "sql": "SELECT id, name, theater_id, points FROM members LIMIT 50" // // } // // // mysql // "name": "customer_db.sql.query", // "arguments": { // "db": "customer_db", // "sql": "SELECT purchase_id, user_id, item_id, total_price FROM purchase_history ORDER BY purchase_id" // } // // // mssql // // "name": "mssql.sql.query", // // "arguments": { // // "db": "mssql", // // "sql": "SELECT TOP 10 * FROM Doctors;" // // } // // // oracle // // "name": "oracle.sql.query", // // "arguments": { // // "db": "oracle", // // "sql": "SELECT * FROM COURSES" // // } // } // } ```

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Muhammad-Idzhans/mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server